From 027183c82040b0cf0bdea16be108eeef3129ed08 Mon Sep 17 00:00:00 2001 From: slabax Date: Thu, 14 May 2026 15:25:43 +0300 Subject: [PATCH] [xpupti]: emit GPU_USER_ANNOTATION for user record_function ranges The XPU PTI plugin already wires user correlation IDs into PTI via ptiViewPushExternalCorrelationId / PTI_VIEW_EXTERNAL_KIND_CUSTOM_1 and populates userCorrelationMap_ in handleCorrelationActivity(), but it never produced any GPU_USER_ANNOTATION events from that information, so user record_function() ranges did not appear on the device timeline. After a GPU activity (kernel, memcpy, memset) is logged, look up its correlation id in userCorrelationMap_ and resolve the originating CPU activity via the linked-activity callback. The first resolved hit on a given (device, stream, user_corr_id) emplaces a synthesized GPU_USER_ANNOTATION GenericTraceActivity; subsequent hits on the same key widen its [startTime, endTime] so the annotation spans the union of all GPU activities on that stream that share the user correlation id, matching the behavior of GpuUserEventMap::insertOrExtendEvent in the generic profiler. The accumulated annotations are logged once at the end of processTrace and the per-session map is cleared between iterations. The lookup is gated on a successful callback resolution rather than a separate dedup set, so a transient nullptr return (CPU side not yet visible when the first GPU activity is processed) is retried on later GPU activities for the same range. Only emit the synthesized event when the caller requested ActivityType::GPU_USER_ANNOTATION, to avoid producing extra events for clients that do not opt in. Tests ----- Extend RunProfilerTest with optional parameters (userCorrelationId, linkedCpuActivity, linkedActivityCallback) so existing test scaffolding can drive a session that pushes/pops a user correlation id around the XPU workload and resolves it back to a CPU-side activity through the linked-activity callback. Existing tests are unaffected (defaults are 0 / nullptr). Add XpuptiProfilerTest.GpuUserAnnotation, which enables GPU_USER_ANNOTATION alongside the existing GPU/runtime activities, runs the XPU compute helper inside a user correlation range, and asserts that exactly one synthesized "user_function" annotation is emitted per participating GPU stream (memcpy stream and kernel stream). Add XpuptiProfilerTest.GpuUserAnnotationLinkedActivityRetry, which uses a callback that returns nullptr on the first lookup and the linked CPU activity thereafter, and verifies that the plugin still emits one annotation per stream -- pinning down the retry-until-resolved contract on the linked-activity callback. --- .../plugin/xpupti/XpuptiActivityHandlers.cpp | 34 +++++ .../xpupti/XpuptiActivityProfilerSession.cpp | 4 + .../xpupti/XpuptiActivityProfilerSession.h | 4 + libkineto/test/xpupti/XpuptiProfilerTest.cpp | 125 ++++++++++++++++++ libkineto/test/xpupti/XpuptiTestUtilities.cpp | 28 +++- libkineto/test/xpupti/XpuptiTestUtilities.h | 7 +- 6 files changed, 197 insertions(+), 5 deletions(-) diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp index 8c2121f11..103603263 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp @@ -9,6 +9,7 @@ #include "XpuptiActivityProfilerSession.h" #include "output_json.h" +#include #include #include @@ -299,6 +300,39 @@ void XpuptiActivityProfilerSession::handleRuntimeKernelMemcpyMemsetActivities( return; } trace_activity->log(logger); + + if constexpr (!handleRuntimeActivities) { + if (activity_types_.count(ActivityType::GPU_USER_ANNOTATION)) { + auto userIt = userCorrelationMap_.find(activity->_correlation_id); + if (userIt != userCorrelationMap_.end() && cpuActivity_) { + const int64_t user_external_id = userIt->second; + const int32_t dev = trace_activity->device; + const int32_t res = trace_activity->resource; + auto key = std::make_tuple(dev, res, user_external_id); + auto annIt = userAnnotationsByStream_.find(key); + if (annIt != userAnnotationsByStream_.end()) { + GenericTraceActivity* ua = annIt->second; + ua->startTime = std::min(ua->startTime, trace_activity->startTime); + ua->endTime = std::max(ua->endTime, trace_activity->endTime); + } else if (const ITraceActivity* cpu_act = + cpuActivity_(user_external_id)) { + traceBuffer_.emplace_activity( + traceBuffer_.span, + ActivityType::GPU_USER_ANNOTATION, + cpu_act->name()); + auto& ua = traceBuffer_.activities.back(); + ua->startTime = trace_activity->startTime; + ua->endTime = trace_activity->endTime; + ua->device = dev; + ua->resource = res; + ua->id = user_external_id; + ua->threadId = trace_activity->threadId; + ua->linked = cpu_act; + userAnnotationsByStream_.emplace(key, ua.get()); + } + } + } + } } void XpuptiActivityProfilerSession::handleOverheadActivity( diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.cpp index bf60c2d3e..a27361256 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.cpp @@ -92,6 +92,10 @@ void XpuptiActivityProfilerSession::processTrace(ActivityLogger& logger) { handlePtiActivity(record, logger); }); } + for (auto& kv : userAnnotationsByStream_) { + kv.second->log(logger); + } + userAnnotationsByStream_.clear(); } void XpuptiActivityProfilerSession::processTrace( diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h index 0cca9df87..a871e47c0 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h +++ b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h @@ -15,8 +15,10 @@ #include +#include #include #include +#include #include #include #include @@ -107,6 +109,8 @@ class XpuptiActivityProfilerSession : public libkineto::IActivityProfilerSession std::unordered_map cpuCorrelationMap_; std::unordered_map userCorrelationMap_; std::unordered_map correlatedPtiActivities_; + std::map, + libkineto::GenericTraceActivity*> userAnnotationsByStream_; std::vector errors_; libkineto::getLinkedActivityCallback cpuActivity_; diff --git a/libkineto/test/xpupti/XpuptiProfilerTest.cpp b/libkineto/test/xpupti/XpuptiProfilerTest.cpp index 616583a11..9150e07b3 100644 --- a/libkineto/test/xpupti/XpuptiProfilerTest.cpp +++ b/libkineto/test/xpupti/XpuptiProfilerTest.cpp @@ -9,11 +9,125 @@ #include "XpuptiTestUtilities.h" #include "include/libkineto.h" +#include "include/GenericTraceActivity.h" #include +#include + namespace KN = KINETO_NAMESPACE; +namespace { + +constexpr int64_t kUserCorrId = 0xC0FFEE; + +enum class LinkedActivityMode { + AlwaysLinked, + MissFirstThenLinked, +}; + +std::vector expectedGpuUserAnnotationActivities() { + return { + "urEnqueueMemBufferWrite", + "urEnqueueMemBufferWrite", + "urEnqueueMemBufferWrite", + "urEnqueueKernelLaunch", + "urEnqueueMemBufferRead", + "Memcpy M2D", + "Memcpy M2D", + "Memcpy M2D", + "Run(sycl::_V1::queue, ...)", + "Memcpy D2M", + "user_function", + "user_function"}; +} + +std::vector expectedGpuUserAnnotationTypes() { + return { + "xpu_runtime", + "xpu_runtime", + "xpu_runtime", + "xpu_runtime", + "xpu_runtime", + "gpu_memcpy", + "gpu_memcpy", + "gpu_memcpy", + "kernel", + "gpu_memcpy", + "gpu_user_annotation", + "gpu_user_annotation"}; +} + +std::unique_ptr runGpuUserAnnotationCase( + LinkedActivityMode mode) { + KN::Config cfg; + std::vector metrics; + + std::set activities{ + KN::ActivityType::GPU_MEMCPY, + KN::ActivityType::CONCURRENT_KERNEL, + KN::ActivityType::XPU_RUNTIME, + KN::ActivityType::EXTERNAL_CORRELATION, + KN::ActivityType::GPU_USER_ANNOTATION, + }; + + auto expectedActivities = expectedGpuUserAnnotationActivities(); + auto expectedTypes = expectedGpuUserAnnotationTypes(); + + static const KN::TraceSpan kCpuSpan(0, 0, "cpu_span", ""); + KN::GenericTraceActivity cpuAct( + kCpuSpan, KN::ActivityType::CPU_OP, "user_function"); + cpuAct.id = kUserCorrId; + + bool firstLookup = true; + auto linkedActivityCallback = + [&cpuAct, &firstLookup, mode](int32_t corr) + -> const KN::ITraceActivity* { + if (corr != kUserCorrId) { + return nullptr; + } + if (mode == LinkedActivityMode::MissFirstThenLinked && firstLookup) { + firstLookup = false; + return nullptr; + } + return &cpuAct; + }; + + constexpr unsigned repeatCount = 1; + [[maybe_unused]] auto [pSession, pBuffer] = RunProfilerTest( + metrics, + activities, + cfg, + repeatCount, + std::move(expectedActivities), + std::move(expectedTypes), + kUserCorrId, + &cpuAct, + linkedActivityCallback); + + return std::move(pBuffer); +} + +void expectTwoUserAnnotations(const KN::CpuTraceBuffer& buffer) { + std::set> streamKeys; + unsigned annotationCount = 0; + for (const auto& activity : buffer.activities) { + if (activity->type() != KN::ActivityType::GPU_USER_ANNOTATION) { + continue; + } + ++annotationCount; + EXPECT_EQ(activity->correlationId(), kUserCorrId); + ASSERT_NE(activity->linkedActivity(), nullptr); + EXPECT_EQ(activity->linkedActivity()->name(), "user_function"); + streamKeys.insert({activity->deviceId(), activity->resourceId()}); + } + + EXPECT_EQ(annotationCount, 2); + EXPECT_EQ(streamKeys.size(), 2); +} + +} // namespace + TEST(XpuptiProfilerTest, XpuDriverEvents) { KN::Config cfg; @@ -117,3 +231,14 @@ TEST(XpuptiProfilerTest, TestEvents) { << ", resourceId=" << resourceInfos[i].id << " never used."; } } + +TEST(XpuptiProfilerTest, GpuUserAnnotation) { + auto pBuffer = runGpuUserAnnotationCase(LinkedActivityMode::AlwaysLinked); + expectTwoUserAnnotations(*pBuffer); +} + +TEST(XpuptiProfilerTest, GpuUserAnnotationLinkedActivityRetry) { + auto pBuffer = + runGpuUserAnnotationCase(LinkedActivityMode::MissFirstThenLinked); + expectTwoUserAnnotations(*pBuffer); +} diff --git a/libkineto/test/xpupti/XpuptiTestUtilities.cpp b/libkineto/test/xpupti/XpuptiTestUtilities.cpp index a72d0a997..cb72540ce 100644 --- a/libkineto/test/xpupti/XpuptiTestUtilities.cpp +++ b/libkineto/test/xpupti/XpuptiTestUtilities.cpp @@ -172,7 +172,10 @@ RunProfilerTest( const KN::Config& cfg, unsigned repeatCount, std::vector&& expectedActivities, - std::vector&& expectedTypes) { + std::vector&& expectedTypes, + int64_t userCorrelationId, + const KN::ITraceActivity* linkedCpuActivity, + std::function linkedActivityCallback) { KN::XPUActivityProfiler profiler; EXPECT_TRUE(profiler.name() == "__xpu_profiler__"); @@ -183,17 +186,34 @@ RunProfilerTest( .count(); pSession->start(); + if (userCorrelationId != 0) { + pSession->pushUserCorrelationId( + static_cast(userCorrelationId)); + } + void ComputeOnXpu(unsigned size, unsigned repeatCount); ComputeOnXpu(1024, repeatCount); + if (userCorrelationId != 0) { + pSession->popUserCorrelationId(); + } + pSession->stop(); int64_t endTime = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); - auto getLinkedActivity = [](int32_t) -> const KN::ITraceActivity* { - return nullptr; - }; + auto getLinkedActivity = std::move(linkedActivityCallback); + if (!getLinkedActivity) { + getLinkedActivity = + [userCorrelationId, linkedCpuActivity]( + int32_t corr) -> const KN::ITraceActivity* { + if (userCorrelationId != 0 && corr == userCorrelationId) { + return linkedCpuActivity; + } + return nullptr; + }; + } TestActivityLogger logger; pSession->processTrace(logger, getLinkedActivity, startTime, endTime); diff --git a/libkineto/test/xpupti/XpuptiTestUtilities.h b/libkineto/test/xpupti/XpuptiTestUtilities.h index 17f5e168d..e544a3141 100644 --- a/libkineto/test/xpupti/XpuptiTestUtilities.h +++ b/libkineto/test/xpupti/XpuptiTestUtilities.h @@ -8,6 +8,8 @@ #include "include/output_base.h" +#include + namespace KN = KINETO_NAMESPACE; bool IsEnvVerbose(); @@ -18,4 +20,7 @@ std::pair, std::unique_ptr&& expectedActivities, - std::vector&& expectedTypes); + std::vector&& expectedTypes, + int64_t userCorrelationId = 0, + const KN::ITraceActivity* linkedCpuActivity = nullptr, + std::function linkedActivityCallback = nullptr);