diff --git a/libkineto/README.md b/libkineto/README.md index fcbae1b25..d18f1c113 100644 --- a/libkineto/README.md +++ b/libkineto/README.md @@ -73,6 +73,8 @@ For more information on how to run on-demand profiling, please refer to the Dyno The default trace output is a JSON file that can be visualized in Chrome Trace Viewer or Perfetto. The trace output is generated by the `ChromeTraceLogger` instance. The `ChromeTraceLogger` writes to a JSON file using `std::ofstream` in `output_json.cpp` to maximize performance during export. This instance is created by the `ActivityProfilerController` and is stored in the `ActivityLoggerFactory` alongside its protocol. Using this schema, Kineto supports multiple trace output formats. +- Intel XCCL: to enable collecting of oneCCL host events, `INTEL_LIBITTNOTIFY64` enviroment variable have to be set as path to `pti_view.so` location. + ## Full documentation We strive to keep our source files readable. The best and up-to-date documentation for implementation specifics is available in the source files. diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp index c934645b5..d25422b32 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityApi.cpp @@ -260,6 +260,11 @@ void XpuptiActivityApi::enableXpuptiActivities( XPUPTI_CALL(ptiViewEnable(PTI_VIEW_COLLECTION_OVERHEAD)); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case ActivityType::COLLECTIVE_COMM: + XPUPTI_CALL(ptiViewEnable(PTI_VIEW_COMMUNICATION)); + break; +#endif default: break; } @@ -304,6 +309,11 @@ void XpuptiActivityApi::disablePtiActivities( XPUPTI_CALL(ptiViewDisable(PTI_VIEW_COLLECTION_OVERHEAD)); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case ActivityType::COLLECTIVE_COMM: + XPUPTI_CALL(ptiViewDisable(PTI_VIEW_COMMUNICATION)); + break; +#endif default: break; } diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp index 8c2121f11..7c86510e2 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp +++ b/libkineto/src/plugin/xpupti/XpuptiActivityHandlers.cpp @@ -301,6 +301,37 @@ void XpuptiActivityProfilerSession::handleRuntimeKernelMemcpyMemsetActivities( trace_activity->log(logger); } +#if PTI_VERSION_AT_LEAST(0, 17) +void XpuptiActivityProfilerSession::handleCommunicationActivity( + const pti_view_record_comms* activity, + ActivityLogger& logger) { + const auto& activity_record = *activity; + const std::string activity_name{activity_record._name}; + const std::string xccl_prefix{"xccl::"}; + const auto record_name = xccl_prefix + activity_name; + + traceBuffer_.span.opCount += 1; + traceBuffer_.emplace_activity(traceBuffer_.span, ActivityType::COLLECTIVE_COMM, record_name); + auto& comms_activity = *(traceBuffer_.activities.back()); + + comms_activity.startTime = activity_record._start_timestamp; + comms_activity.endTime = activity_record._end_timestamp; + comms_activity.device = activity_record._process_id; + comms_activity.resource = activity_record._thread_id; + comms_activity.threadId = activity_record._thread_id; + + comms_activity.addMetadata("Communicator_id", activity_record._communicator_id); + + if (outOfRange(&comms_activity)) { + traceBuffer_.span.opCount -= 1; + traceBuffer_.activities.pop_back(); + return; + } + + comms_activity.log(logger); +} +#endif + void XpuptiActivityProfilerSession::handleOverheadActivity( const pti_view_record_overhead* activity, ActivityLogger& logger) { @@ -375,6 +406,12 @@ void XpuptiActivityProfilerSession::handlePtiActivity( handleOverheadActivity( reinterpret_cast(record), logger); break; +#if PTI_VERSION_AT_LEAST(0, 17) + case PTI_VIEW_COMMUNICATION: + handleCommunicationActivity( + reinterpret_cast(record), logger); + break; +#endif default: errors_.push_back( "Unexpected activity type: " + std::to_string(record->_view_kind)); diff --git a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h index 0cca9df87..66f6a6808 100644 --- a/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h +++ b/libkineto/src/plugin/xpupti/XpuptiActivityProfilerSession.h @@ -83,6 +83,9 @@ class XpuptiActivityProfilerSession : public libkineto::IActivityProfilerSession const pti_view_memory_record_type* activity, ActivityLogger& logger); +#if PTI_VERSION_AT_LEAST(0, 17) + void handleCommunicationActivity(const pti_view_record_comms* activity, ActivityLogger& logger); +#endif void handleOverheadActivity(const pti_view_record_overhead* activity, ActivityLogger& logger); void handlePtiActivity(const pti_view_record_base* record, ActivityLogger& logger); diff --git a/libkineto/test/xpupti/CMakeLists.txt b/libkineto/test/xpupti/CMakeLists.txt index faa5751b8..a0d7ff5a5 100644 --- a/libkineto/test/xpupti/CMakeLists.txt +++ b/libkineto/test/xpupti/CMakeLists.txt @@ -19,8 +19,11 @@ add_executable(XpuptiScopeProfilerConfigTest XpuptiScopeProfilerConfigTest.cpp) target_link_libraries(XpuptiScopeProfilerConfigTest PRIVATE ${LINK_LIBRARIES}) gtest_discover_tests(XpuptiScopeProfilerConfigTest) -include(ExternalProject) +add_executable(XpuptiActivityHandlersTest XpuptiActivityHandlersTest.cpp) +target_link_libraries(XpuptiActivityHandlersTest PRIVATE ${LINK_LIBRARIES}) +gtest_add_tests(TARGET XpuptiActivityHandlersTest) +include(ExternalProject) function(make_test test_file) get_filename_component(test_name "${test_file}" NAME_WE) set(lib_name "${test_name}Lib") diff --git a/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp b/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp new file mode 100644 index 000000000..62ef6d16c --- /dev/null +++ b/libkineto/test/xpupti/XpuptiActivityHandlersTest.cpp @@ -0,0 +1,159 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "src/plugin/xpupti/XpuptiActivityApi.h" +#include "src/plugin/xpupti/XpuptiActivityProfilerSession.h" +#include "src/ActivityBuffers.h" +#include "include/output_base.h" + +#include "src/plugin/xpupti/XpuptiProfilerMacros.h" + +#include + +namespace KN = KINETO_NAMESPACE; +using namespace libkineto; + +// Mock XpuptiActivityApi that delivers hand-crafted PTI records +// through the virtual processActivities without needing PTI runtime. +class MockXpuptiActivityApi : public KN::XpuptiActivityApi { + public: + std::vector records; + + std::unique_ptr activityBuffers() override { + // Return a non-null map so processTrace enters the processing path. + return std::make_unique(); + } + + const std::pair processActivities( + KN::XpuptiActivityBufferMap&, + std::function handler) override { + for (auto* record : records) { + handler(record); + } + return {static_cast(records.size()), 0}; + } +}; + +// Minimal ActivityLogger that captures logged GenericTraceActivity objects. +class MockActivityLogger : public ActivityLogger { + public: + std::vector logged_activities; + + void handleDeviceInfo(const DeviceInfo&, uint64_t) override {} + void handleResourceInfo(const ResourceInfo&, int64_t) override {} + void handleOverheadInfo(const OverheadInfo&, int64_t) override {} + void handleTraceSpan(const TraceSpan&) override {} + + void handleActivity(const ITraceActivity&) override {} + + void handleGenericActivity(const GenericTraceActivity& activity) override { + logged_activities.push_back(&activity); + } + + void handleTraceStart( + const std::unordered_map&, + const std::string&) override {} + + void finalizeMemoryTrace(const std::string&, const Config&) override {} + + void finalizeTrace( + const Config&, + std::unique_ptr, + int64_t, + std::unordered_map>&) override {} +}; + +class XpuptiActivityHandlersTest : public ::testing::Test { + protected: + MockXpuptiActivityApi mockApi_; + MockActivityLogger logger_; + + // Processes all records in mockApi_ through the handler pipeline + // and returns the resulting trace buffer. + std::unique_ptr processAndGetTrace( + int64_t windowStart = 0, + int64_t windowEnd = 1000) { + Config config; + std::set activity_types = {ActivityType::COLLECTIVE_COMM, ActivityType::XPU_SYNC}; + auto session = std::make_unique( + mockApi_, "__test_profiler__", config, activity_types); + session->processTrace( + logger_, + [](int64_t) -> const ITraceActivity* { return nullptr; }, + windowStart, + windowEnd); + return session->getTraceBuffer(); + } +}; + +// --- Communication Activity Tests --- + +#if PTI_VERSION_AT_LEAST(0, 17) +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityHasXcclPrefix) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "allreduce"; + comms_record._start_timestamp = 100; + comms_record._end_timestamp = 200; + comms_record._process_id = 1; + comms_record._thread_id = 42; + comms_record._communicator_id = 7; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(); + ASSERT_EQ(traceBuffer->activities.size(), 1); + + auto& activity = *traceBuffer->activities[0]; + EXPECT_EQ(activity.name(), "xccl::allreduce"); + EXPECT_EQ(activity.type(), ActivityType::COLLECTIVE_COMM); +} + +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityFields) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "broadcast"; + comms_record._start_timestamp = 300; + comms_record._end_timestamp = 500; + comms_record._process_id = 10; + comms_record._thread_id = 77; + comms_record._communicator_id = 99; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(); + ASSERT_EQ(traceBuffer->activities.size(), 1); + + auto& activity = *traceBuffer->activities[0]; + EXPECT_EQ(activity.timestamp(), 300); + EXPECT_EQ(activity.duration(), 200); + EXPECT_EQ(activity.deviceId(), 10); + EXPECT_EQ(activity.resourceId(), 77); + EXPECT_EQ(activity.getThreadId(), 77); + EXPECT_EQ(activity.getMetadataValue("Communicator_id"), "99"); +} + +TEST_F(XpuptiActivityHandlersTest, CommunicationActivityOutOfRange) { + pti_view_record_comms comms_record{}; + comms_record._view_kind._view_kind = PTI_VIEW_COMMUNICATION; + comms_record._name = "allgather"; + comms_record._start_timestamp = 2000; + comms_record._end_timestamp = 3000; + comms_record._process_id = 1; + comms_record._thread_id = 1; + comms_record._communicator_id = 1; + + mockApi_.records.push_back( + reinterpret_cast(&comms_record)); + + auto traceBuffer = processAndGetTrace(100, 500); + EXPECT_EQ(traceBuffer->activities.size(), 0); +} +#endif // PTI_VERSION_AT_LEAST(0, 17)