From 682d8245d1f901f9504464b4d403e69ef77f36fb Mon Sep 17 00:00:00 2001 From: Tomasz Socha Date: Mon, 11 May 2026 11:40:11 +0200 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=A4=96=20Add=20XCCL=20collective=20co?= =?UTF-8?q?mmunication=20activity=20tracing=20to=20XPU=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable PTI_VIEW_COMMUNICATION collection in the XPU PTI plugin so oneCCL host-side collective operations show up in Kineto traces. Events are emitted as COLLECTIVE_COMM activities named with an "xccl::" prefix and carry the PTI communicator id as metadata. - Gate new code paths on PTI_VERSION_AT_LEAST(0, 17) - Wire enable/disable of PTI_VIEW_COMMUNICATION in XpuptiActivityApi - Add handleCommunicationActivity for pti_view_record_comms records - Add unit tests covering naming, field mapping, and out-of-range drop - Document INTEL_LIBITTNOTIFY64 requirement in libkineto/README.md --- libkineto/README.md | 2 + .../src/plugin/xpupti/XpuptiActivityApi.cpp | 21 +++ .../plugin/xpupti/XpuptiActivityHandlers.cpp | 37 ++++ .../xpupti/XpuptiActivityProfilerSession.h | 3 + libkineto/test/xpupti/CMakeLists.txt | 5 +- .../xpupti/XpuptiActivityHandlersTest.cpp | 159 ++++++++++++++++++ 6 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp diff --git a/libkineto/README.md b/libkineto/README.md index fcbae1b25..d18f1c113 100644 --- a/libkineto/README.md +++ b/libkineto/README.md @@ -73,6 +73,8 @@ For more information on how to run on-demand profiling, please refer to the Dyno The default trace output is a JSON file that can be visualized in Chrome Trace Viewer or Perfetto. The trace output is generated by the `ChromeTraceLogger` instance. The `ChromeTraceLogger` writes to a JSON file using `std::ofstream` in `output_json.cpp` to maximize performance during export. This instance is created by the `ActivityProfilerController` and is stored in the `ActivityLoggerFactory` alongside its protocol. Using this schema, Kineto supports multiple trace output formats. +- Intel XCCL: to enable collecting of oneCCL host events, `INTEL_LIBITTNOTIFY64` enviroment variable have to be set as path to `pti_view.so` location. + ## Full documentation We strive to keep our source files readable. The best and up-to-date documentation for implementation specifics is available in the source files. diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp index c934645b5..9c161d48a 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp @@ -7,6 +7,7 @@ */ #include "XpuptiActivityApi.h" +#include "Logger.h" #include #include @@ -260,6 +261,16 @@ void XpuptiActivityApi::enableXpuptiActivities( XPUPTI_CALL(ptiViewEnable(PTI_VIEW_COLLECTION_OVERHEAD)); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case ActivityType::COLLECTIVE_COMM: { + auto rc = ptiViewEnable(PTI_VIEW_COMMUNICATION); + if (rc != PTI_SUCCESS) { + LOG(WARNING) << "Failed to enable PTI_VIEW_COMMUNICATION: " + << ptiResultTypeToString(rc); + } + break; + } +#endif default: break; } @@ -304,6 +315,16 @@ void XpuptiActivityApi::disablePtiActivities( XPUPTI_CALL(ptiViewDisable(PTI_VIEW_COLLECTION_OVERHEAD)); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case ActivityType::COLLECTIVE_COMM: { + auto rc = ptiViewDisable(PTI_VIEW_COMMUNICATION); + if (rc != PTI_SUCCESS) { + LOG(WARNING) << "Failed to disable PTI_VIEW_COMMUNICATION: " + << ptiResultTypeToString(rc); + } + break; + } +#endif default: break; } diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp index 8c2121f11..7c86510e2 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp @@ -301,6 +301,37 @@ void XpuptiActivityProfilerSession::handleRuntimeKernelMemcpyMemsetActivities( trace_activity->log(logger); } +#if PTI_VERSION_AT_LEAST(0, 17) +void XpuptiActivityProfilerSession::handleCommunicationActivity( + const pti_view_record_comms* activity, + ActivityLogger& logger) { + const auto& activity_record = *activity; + const std::string activity_name{activity_record._name}; + const std::string xccl_prefix{"xccl::"}; + const auto record_name = xccl_prefix + activity_name; + + traceBuffer_.span.opCount += 1; + traceBuffer_.emplace_activity(traceBuffer_.span, ActivityType::COLLECTIVE_COMM, record_name); + auto& comms_activity = *(traceBuffer_.activities.back()); + + comms_activity.startTime = activity_record._start_timestamp; + comms_activity.endTime = activity_record._end_timestamp; + comms_activity.device = activity_record._process_id; + comms_activity.resource = activity_record._thread_id; + comms_activity.threadId = activity_record._thread_id; + + comms_activity.addMetadata("Communicator_id", activity_record._communicator_id); + + if (outOfRange(&comms_activity)) { + traceBuffer_.span.opCount -= 1; + traceBuffer_.activities.pop_back(); + return; + } + + comms_activity.log(logger); +} +#endif + void XpuptiActivityProfilerSession::handleOverheadActivity( const pti_view_record_overhead* activity, ActivityLogger& logger) { @@ -375,6 +406,12 @@ void XpuptiActivityProfilerSession::handlePtiActivity( handleOverheadActivity( reinterpret_cast(record), logger); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case PTI_VIEW_COMMUNICATION: + handleCommunicationActivity( + reinterpret_cast(record), logger); + break; +#endif default: errors_.push_back( "Unexpected activity type: " + std::to_string(record->_view_kind)); diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h index 0cca9df87..66f6a6808 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h +++ b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h @@ -83,6 +83,9 @@ class XpuptiActivityProfilerSession : public libkineto::IActivityProfilerSession const pti_view_memory_record_type* activity, ActivityLogger& logger); +#if PTI_VERSION_AT_LEAST(0, 17) + void handleCommunicationActivity(const pti_view_record_comms* activity, ActivityLogger& logger); +#endif void handleOverheadActivity(const pti_view_record_overhead* activity, ActivityLogger& logger); void handlePtiActivity(const pti_view_record_base* record, ActivityLogger& logger); diff --git a/libkineto/test/xpupti/CMakeLists.txt b/libkineto/test/xpupti/CMakeLists.txt index 1bc5595e7..f91dc81ee 100644 --- a/libkineto/test/xpupti/CMakeLists.txt +++ b/libkineto/test/xpupti/CMakeLists.txt @@ -19,8 +19,11 @@ add_executable(XpuptiScopeProfilerConfigTest XpuptiScopeProfilerConfigTest.cpp) target_link_libraries(XpuptiScopeProfilerConfigTest PRIVATE ${LINK_LIBRARIES}) gtest_discover_tests(XpuptiScopeProfilerConfigTest) -include(ExternalProject) +add_executable(XpuptiActivityHandlersTest XpuptiActivityHandlersTest.cpp) +target_link_libraries(XpuptiActivityHandlersTest PRIVATE ${LINK_LIBRARIES}) +gtest_add_tests(TARGET XpuptiActivityHandlersTest) +include(ExternalProject) function(make_test test_file) get_filename_component(test_name "${test_file}" NAME_WE) set(lib_name "${test_name}Lib") diff --git a/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp b/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp new file mode 100644 index 000000000..62ef6d16c --- /dev/null +++ b/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp @@ -0,0 +1,159 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "src/plugin/xpupti/XpuptiActivityApi.h" +#include "src/plugin/xpupti/XpuptiActivityProfilerSession.h" +#include "src/ActivityBuffers.h" +#include "include/output_base.h" + +#include "src/plugin/xpupti/XpuptiProfilerMacros.h" + +#include + +namespace KN = KINETO_NAMESPACE; +using namespace libkineto; + +// Mock XpuptiActivityApi that delivers hand-crafted PTI records +// through the virtual processActivities without needing PTI runtime. +class MockXpuptiActivityApi : public KN::XpuptiActivityApi { + public: + std::vector records; + + std::unique_ptr activityBuffers() override { + // Return a non-null map so processTrace enters the processing path. + return std::make_unique(); + } + + const std::pair processActivities( + KN::XpuptiActivityBufferMap&, + std::function handler) override { + for (auto* record : records) { + handler(record); + } + return {static_cast(records.size()), 0}; + } +}; + +// Minimal ActivityLogger that captures logged GenericTraceActivity objects. +class MockActivityLogger : public ActivityLogger { + public: + std::vector logged_activities; + + void handleDeviceInfo(const DeviceInfo&, uint64_t) override {} + void handleResourceInfo(const ResourceInfo&, int64_t) override {} + void handleOverheadInfo(const OverheadInfo&, int64_t) override {} + void handleTraceSpan(const TraceSpan&) override {} + + void handleActivity(const ITraceActivity&) override {} + + void handleGenericActivity(const GenericTraceActivity& activity) override { + logged_activities.push_back(&activity); + } + + void handleTraceStart( + const std::unordered_map&, + const std::string&) override {} + + void finalizeMemoryTrace(const std::string&, const Config&) override {} + + void finalizeTrace( + const Config&, + std::unique_ptr, + int64_t, + std::unordered_map>&) override {} +}; + +class XpuptiActivityHandlersTest : public ::testing::Test { + protected: + MockXpuptiActivityApi mockApi_; + MockActivityLogger logger_; + + // Processes all records in mockApi_ through the handler pipeline + // and returns the resulting trace buffer. + std::unique_ptr processAndGetTrace( + int64_t windowStart = 0, + int64_t windowEnd = 1000) { + Config config; + std::set activity_types = {ActivityType::COLLECTIVE_COMM, ActivityType::XPU_SYNC}; + auto session = std::make_unique( + mockApi_, "__test_profiler__", config, activity_types); + session->processTrace( + logger_, + [](int64_t) -> const ITraceActivity* { return nullptr; }, + windowStart, + windowEnd); + return session->getTraceBuffer(); + } +}; + +// --- Communication Activity Tests --- + +#if PTI_VERSION_AT_LEAST(0, 17) +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityHasXcclPrefix) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "allreduce"; + comms_record._start_timestamp = 100; + comms_record._end_timestamp = 200; + comms_record._process_id = 1; + comms_record._thread_id = 42; + comms_record._communicator_id = 7; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(); + ASSERT_EQ(traceBuffer->activities.size(), 1); + + auto& activity = *traceBuffer->activities[0]; + EXPECT_EQ(activity.name(), "xccl::allreduce"); + EXPECT_EQ(activity.type(), ActivityType::COLLECTIVE_COMM); +} + +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityFields) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "broadcast"; + comms_record._start_timestamp = 300; + comms_record._end_timestamp = 500; + comms_record._process_id = 10; + comms_record._thread_id = 77; + comms_record._communicator_id = 99; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(); + ASSERT_EQ(traceBuffer->activities.size(), 1); + + auto& activity = *traceBuffer->activities[0]; + EXPECT_EQ(activity.timestamp(), 300); + EXPECT_EQ(activity.duration(), 200); + EXPECT_EQ(activity.deviceId(), 10); + EXPECT_EQ(activity.resourceId(), 77); + EXPECT_EQ(activity.getThreadId(), 77); + EXPECT_EQ(activity.getMetadataValue("Communicator_id"), "99"); +} + +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityOutOfRange) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "allgather"; + comms_record._start_timestamp = 2000; + comms_record._end_timestamp = 3000; + comms_record._process_id = 1; + comms_record._thread_id = 1; + comms_record._communicator_id = 1; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(100, 500); + EXPECT_EQ(traceBuffer->activities.size(), 0); +} +#endif // PTI_VERSION_AT_LEAST(0, 17) From 5aeaf26e78e283d6b791f5811771d719e123c0b9 Mon Sep 17 00:00:00 2001 From: Tomasz Socha Date: Tue, 12 May 2026 11:48:54 +0200 Subject: [PATCH 2/2] Switch from WARNING to standard XPUPTI_CALL --- .../src/plugin/xpupti/XpuptiActivityApi.cpp | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp index 9c161d48a..d25422b32 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp @@ -7,7 +7,6 @@ */ #include "XpuptiActivityApi.h" -#include "Logger.h" #include #include @@ -262,14 +261,9 @@ void XpuptiActivityApi::enableXpuptiActivities( break; #if PTI_VERSION_AT_LEAST(0, 17) - case ActivityType::COLLECTIVE_COMM: { - auto rc = ptiViewEnable(PTI_VIEW_COMMUNICATION); - if (rc != PTI_SUCCESS) { - LOG(WARNING) << "Failed to enable PTI_VIEW_COMMUNICATION: " - << ptiResultTypeToString(rc); - } + case ActivityType::COLLECTIVE_COMM: + XPUPTI_CALL(ptiViewEnable(PTI_VIEW_COMMUNICATION)); break; - } #endif default: break; @@ -316,14 +310,9 @@ void XpuptiActivityApi::disablePtiActivities( break; #if PTI_VERSION_AT_LEAST(0, 17) - case ActivityType::COLLECTIVE_COMM: { - auto rc = ptiViewDisable(PTI_VIEW_COMMUNICATION); - if (rc != PTI_SUCCESS) { - LOG(WARNING) << "Failed to disable PTI_VIEW_COMMUNICATION: " - << ptiResultTypeToString(rc); - } + case ActivityType::COLLECTIVE_COMM: + XPUPTI_CALL(ptiViewDisable(PTI_VIEW_COMMUNICATION)); break; - } #endif default: break;