diff --git a/libkineto/CMakeLists.txt b/libkineto/CMakeLists.txt index f17f8b072..519b7f36e 100644 --- a/libkineto/CMakeLists.txt +++ b/libkineto/CMakeLists.txt @@ -205,6 +205,14 @@ elseif(KINETO_BACKEND STREQUAL "rocm") endif() target_compile_definitions(kineto_base PRIVATE "__HIP_PLATFORM_HCC__") target_compile_definitions(kineto_base PRIVATE "__HIP_PLATFORM_AMD__") + # The rocprofiler-sdk and HSA system headers (anonymous structs, flexible + # array members, etc.) trip -Werror=pedantic when callers (e.g. PyTorch's + # cmake/Dependencies.cmake) propagate -Wpedantic to us. Silence pedantic for + # the kineto sources only; the headers themselves come from /opt/rocm and + # we can't change them. + if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + list(APPEND KINETO_COMPILE_OPTIONS "-Wno-pedantic") + endif() elseif(KINETO_BACKEND STREQUAL "xpu") list(APPEND KINETO_COMPILE_OPTIONS ${XPUPTI_BUILD_FLAG}) if(KINETO_BUILD_TESTS) diff --git a/libkineto/src/RocLogger.h b/libkineto/src/RocLogger.h index a59ffada5..61f23599e 100644 --- a/libkineto/src/RocLogger.h +++ b/libkineto/src/RocLogger.h @@ -69,6 +69,8 @@ typedef enum { ROCTRACER_ACTIVITY_COPY, ROCTRACER_ACTIVITY_MALLOC, ROCTRACER_ACTIVITY_ASYNC, + ROCTRACER_ACTIVITY_EVENT_RECORD, + ROCTRACER_ACTIVITY_SYNC, ROCTRACER_ACTIVITY_NONE } rocprof_activity_types; @@ -213,3 +215,55 @@ struct rocprofAsyncRow : public rocprofBase { uint64_t queue; std::string kernelName; }; + +enum rocprofSyncType { + ROCPROF_SYNC_STREAM_WAIT_EVENT = 0, + ROCPROF_SYNC_EVENT_SYNCHRONIZE, + ROCPROF_SYNC_STREAM_SYNCHRONIZE, + ROCPROF_SYNC_DEVICE_SYNCHRONIZE, +}; + +struct rocprofEventRecordRow : public rocprofRow { + rocprofEventRecordRow(uint64_t id, + uint32_t domain, + uint32_t cid, + uint32_t pid, + uint32_t tid, + uint64_t begin, + uint64_t end, + hipEvent_t event, + hipStream_t stream) + : rocprofRow(id, domain, cid, pid, tid, begin, end, + ROCTRACER_ACTIVITY_EVENT_RECORD), + event(event), + stream(stream) {} + hipEvent_t event; + hipStream_t stream; +}; + +struct rocprofSyncRow : public rocprofRow { + rocprofSyncRow(uint64_t id, + uint32_t domain, + uint32_t cid, + uint32_t pid, + uint32_t tid, + uint64_t begin, + uint64_t end, + rocprofSyncType syncType, + hipStream_t stream, + hipEvent_t event, + hipStream_t srcStream, + uint64_t srcCorrId) + : rocprofRow(id, domain, cid, pid, tid, begin, end, + ROCTRACER_ACTIVITY_SYNC), + syncType(syncType), + stream(stream), + event(event), + srcStream(srcStream), + srcCorrId(srcCorrId) {} + rocprofSyncType syncType; + hipStream_t stream; + hipEvent_t event; + hipStream_t srcStream; + uint64_t srcCorrId; +}; diff --git a/libkineto/src/RocmActivityProfiler.cpp b/libkineto/src/RocmActivityProfiler.cpp index 120d5b380..ddbd03eb0 100644 --- a/libkineto/src/RocmActivityProfiler.cpp +++ b/libkineto/src/RocmActivityProfiler.cpp @@ -162,6 +162,12 @@ void RocmActivityProfiler::popCorrelationIdImpl(CorrelationFlowType type) { void RocmActivityProfiler::onResetTraceData() { roc_.teardownContext(); +#ifndef ROCTRACER_FALLBACK + // Drop any hipEvent_t -> {stream, corrId} entries left over from the prior + // profiling session so they cannot be returned as the producer of a wait + // recorded in the next session. Mirrors CuptiActivityProfiler::onResetTraceData. + RocprofLogger::clearEventMap(); +#endif } void RocmActivityProfiler::onFinalizeTrace( @@ -273,6 +279,14 @@ void RocmActivityProfiler::handleRocprofActivity( handleGpuActivity( reinterpret_cast(record), logger); break; + case ROCTRACER_ACTIVITY_EVENT_RECORD: + handleRuntimeActivity( + reinterpret_cast(record), logger); + break; + case ROCTRACER_ACTIVITY_SYNC: + handleRuntimeActivity( + reinterpret_cast(record), logger); + break; case ROCTRACER_ACTIVITY_NONE: default: LOG(WARNING) << "Unexpected activity type: " << record->type; diff --git a/libkineto/src/RocprofActivity_inl.h b/libkineto/src/RocprofActivity_inl.h index bb0dcc6e4..d46f45d45 100644 --- a/libkineto/src/RocprofActivity_inl.h +++ b/libkineto/src/RocprofActivity_inl.h @@ -239,6 +239,79 @@ inline const std::string RuntimeActivity::metadataJson() const raw().ptr); } +template <> +inline const std::string RuntimeActivity::metadataJson() + const { + return fmt::format( + R"JSON( + "cid": {}, "correlation": {}, + "hip_event": "{}", "hip_stream": "{}")JSON", + raw().cid, + raw().id, + fmt::ptr(raw().event), + fmt::ptr(raw().stream)); +} + +template <> +inline const std::string RuntimeActivity::metadataJson() const { + static const char* syncTypeNames[] = { + "stream_wait_event", + "event_synchronize", + "stream_synchronize", + "device_synchronize", + }; + const char* syncName = (raw().syncType >= 0 && raw().syncType <= 3) + ? syncTypeNames[raw().syncType] + : "unknown"; + + std::string meta = fmt::format( + R"JSON( + "cid": {}, "correlation": {}, + "sync_type": "{}")JSON", + raw().cid, + raw().id, + syncName); + + meta += fmt::format( + R"JSON(, + "hip_stream": "{}")JSON", + fmt::ptr(raw().stream)); + if (raw().event) { + meta += fmt::format( + R"JSON(, + "hip_event": "{}")JSON", + fmt::ptr(raw().event)); + } + // Inter-stream dependency metadata: emitted for sync types that wait on a + // specific hipEvent_t (stream wait event, event synchronize) whenever the + // event was resolved against a prior hipEventRecord in g_eventMap. Field + // names mirror CUPTI's `wait_on_*` keys for CuptiActivityProfiler parity: + // + // wait_on_stream <=> CUPTI wait_on_stream + // wait_on_hip_event_record_corr_id <=> CUPTI wait_on_cuda_event_record_corr_id + // wait_on_hip_event_id <=> CUPTI wait_on_cuda_event_id + // + // The last field reports the hipEvent_t handle the wait was issued against, + // independent of whether a producer record was found. + if ((raw().syncType == ROCPROF_SYNC_STREAM_WAIT_EVENT || + raw().syncType == ROCPROF_SYNC_EVENT_SYNCHRONIZE) && + raw().event) { + meta += fmt::format( + R"JSON(, + "wait_on_hip_event_id": "{}")JSON", + fmt::ptr(raw().event)); + if (raw().srcCorrId) { + meta += fmt::format( + R"JSON(, + "wait_on_stream": "{}", + "wait_on_hip_event_record_corr_id": {})JSON", + fmt::ptr(raw().srcStream), + raw().srcCorrId); + } + } + return meta; +} + template inline const std::string RuntimeActivity::metadataJson() const { return fmt::format( diff --git a/libkineto/src/RocprofLogger.cpp b/libkineto/src/RocprofLogger.cpp index 24a8b34bf..f54ec01b5 100644 --- a/libkineto/src/RocprofLogger.cpp +++ b/libkineto/src/RocprofLogger.cpp @@ -17,9 +17,11 @@ #include #include +#include #include #include #include +#include #include "ApproximateClock.h" #include "Demangle.h" @@ -250,6 +252,101 @@ bool isMallocApi(uint32_t id) { return false; } +bool isEventRecordApi(uint32_t id) { + switch (id) { + case ROCPROFILER_HIP_RUNTIME_API_ID_hipEventRecord: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipEventRecord_spt: +#ifdef ROCPROFILER_HIP_RUNTIME_API_ID_hipEventRecordWithFlags + case ROCPROFILER_HIP_RUNTIME_API_ID_hipEventRecordWithFlags: +#endif + return true; + break; + default:; + } + return false; +} + +bool isSyncApi(uint32_t id) { + switch (id) { + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamWaitEvent: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamWaitEvent_spt: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipEventSynchronize: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamSynchronize: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamSynchronize_spt: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipDeviceSynchronize: + return true; + break; + default:; + } + return false; +} + +struct event_record_args { + hipEvent_t event{nullptr}; + hipStream_t stream{nullptr}; +}; +auto extract_event_record_args = + []([[maybe_unused]] rocprofiler_callback_tracing_kind_t kind, + [[maybe_unused]] rocprofiler_tracing_operation_t operation, + [[maybe_unused]] uint32_t arg_num, + const void* const arg_value_addr, + [[maybe_unused]] int32_t indirection_count, + [[maybe_unused]] const char* arg_type, + const char* arg_name, + [[maybe_unused]] const char* arg_value_str, + [[maybe_unused]] int32_t dereference_count, + void* cb_data) -> int { + auto& args = *(static_cast(cb_data)); + if (strcmp("event", arg_name) == 0) + args.event = *(reinterpret_cast(arg_value_addr)); + else if (strcmp("stream", arg_name) == 0) + args.stream = *(reinterpret_cast(arg_value_addr)); + return 0; +}; + +struct sync_args { + hipStream_t stream{nullptr}; + hipEvent_t event{nullptr}; +}; +auto extract_sync_args = + []([[maybe_unused]] rocprofiler_callback_tracing_kind_t kind, + [[maybe_unused]] rocprofiler_tracing_operation_t operation, + [[maybe_unused]] uint32_t arg_num, + const void* const arg_value_addr, + [[maybe_unused]] int32_t indirection_count, + [[maybe_unused]] const char* arg_type, + const char* arg_name, + [[maybe_unused]] const char* arg_value_str, + [[maybe_unused]] int32_t dereference_count, + void* cb_data) -> int { + auto& args = *(static_cast(cb_data)); + if (strcmp("stream", arg_name) == 0) + args.stream = *(reinterpret_cast(arg_value_addr)); + else if (strcmp("event", arg_name) == 0) + args.event = *(reinterpret_cast(arg_value_addr)); + return 0; +}; + +// Maps hipEvent_t -> sorted vector of {hipStream_t, correlation_id} for every +// hipEventRecord observed on that event handle. +// +// HIP event handles are reused: applications routinely call hipEventRecord on +// the same hipEvent_t many times across the trace. Storing only the most +// recent record (single-entry map) gives wrong attribution when an earlier +// hipStreamWaitEvent fires its callback after a later hipEventRecord (cudaEvent +// callbacks are not guaranteed in-order across streams). +// +// The vector is kept sorted by correlationId so that a hipStreamWaitEvent +// callback can binary-search for the most recent record whose correlationId is +// strictly less than its own (i.e., the producer record it actually waits on). +// This mirrors CUPTI's `waitEventMap` design in CuptiActivityProfiler.cpp. +struct EventMapEntry { + hipStream_t stream{nullptr}; + uint64_t corrId{0}; +}; +std::mutex g_eventMapMutex; +std::unordered_map> g_eventMap; + class RocprofApiIdList : public ApiIdList { public: RocprofApiIdList(callback_name_info& names); @@ -515,6 +612,55 @@ void RocprofLogger::popCorrelationID(CorrelationDomain type) { } } +void RocprofLogger::recordEvent( + void* event, + void* stream, + uint64_t corrId) { + std::lock_guard lock(g_eventMapMutex); + auto& vec = g_eventMap[event]; + EventMapEntry entry{static_cast(stream), corrId}; + auto pos = std::lower_bound( + vec.begin(), + vec.end(), + entry.corrId, + [](const EventMapEntry& a, uint64_t val) { return a.corrId < val; }); + vec.insert(pos, entry); +} + +bool RocprofLogger::resolveWait( + void* event, + uint64_t queryCorrId, + void** outStream, + uint64_t* outCorrId) { + std::lock_guard lock(g_eventMapMutex); + auto it = g_eventMap.find(event); + if (it == g_eventMap.end()) { + return false; + } + const auto& vec = it->second; + auto pos = std::upper_bound( + vec.begin(), + vec.end(), + queryCorrId, + [](uint64_t val, const EventMapEntry& a) { return val < a.corrId; }); + if (pos == vec.begin()) { + return false; + } + auto prev = std::prev(pos); + if (outStream) { + *outStream = static_cast(prev->stream); + } + if (outCorrId) { + *outCorrId = prev->corrId; + } + return true; +} + +void RocprofLogger::clearEventMap() { + std::lock_guard lock(g_eventMapMutex); + g_eventMap.clear(); +} + void RocprofLogger::clearLogs() { // CuptiActivityProfiler clears this before the output Loggers use the data // for (auto &row : rows_) @@ -672,6 +818,93 @@ void RocprofLogger::api_callback( args.size); insert_row_to_buffer(row); } + // Event Record + else if (isEventRecordApi(record.operation)) { + event_record_args args; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, extract_event_record_args, 1, &args); + + recordEvent( + static_cast(args.event), + static_cast(args.stream), + record.correlation_id.internal); + + rocprofEventRecordRow* row = new rocprofEventRecordRow( + record.correlation_id.internal, + record.kind, + record.operation, + processId(), + systemThreadId(), + startTime, + endTime, + args.event, + args.stream); + insert_row_to_buffer(row); + } + // Sync APIs (stream wait event, event sync, stream sync, device sync) + else if (isSyncApi(record.operation)) { + sync_args args; + rocprofiler_iterate_callback_tracing_kind_operation_args( + record, extract_sync_args, 1, &args); + + rocprofSyncType syncType; + hipStream_t srcStream = nullptr; + uint64_t srcCorrId = 0; + + switch (record.operation) { + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamWaitEvent: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamWaitEvent_spt: + syncType = ROCPROF_SYNC_STREAM_WAIT_EVENT; + break; + case ROCPROFILER_HIP_RUNTIME_API_ID_hipEventSynchronize: + syncType = ROCPROF_SYNC_EVENT_SYNCHRONIZE; + break; + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamSynchronize: + case ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamSynchronize_spt: + syncType = ROCPROF_SYNC_STREAM_SYNCHRONIZE; + break; + case ROCPROFILER_HIP_RUNTIME_API_ID_hipDeviceSynchronize: + default: + syncType = ROCPROF_SYNC_DEVICE_SYNCHRONIZE; + break; + } + + // For sync types that wait on a specific hipEvent_t (stream wait event + // and event synchronize), look up the most recent hipEventRecord on + // that event whose correlationId strictly precedes this sync's, so we + // can emit the producer stream + corr_id in the trace metadata. + // Matches CUPTI's isEventSync() handling for STREAM_WAIT_EVENT and + // EVENT_SYNCHRONIZE. + if ((syncType == ROCPROF_SYNC_STREAM_WAIT_EVENT || + syncType == ROCPROF_SYNC_EVENT_SYNCHRONIZE) && + args.event != nullptr) { + void* resolvedStream = nullptr; + uint64_t resolvedCorrId = 0; + if (resolveWait( + static_cast(args.event), + record.correlation_id.internal, + &resolvedStream, + &resolvedCorrId)) { + srcStream = static_cast(resolvedStream); + srcCorrId = resolvedCorrId; + } + } + + rocprofSyncRow* row = new rocprofSyncRow( + record.correlation_id.internal, + record.kind, + record.operation, + processId(), + systemThreadId(), + startTime, + endTime, + syncType, + args.stream, + args.event, + srcStream, + srcCorrId); + insert_row_to_buffer(row); + } // Default Records else { struct { hipStream_t stream{nullptr}; } default_args; diff --git a/libkineto/src/RocprofLogger.h b/libkineto/src/RocprofLogger.h index a13d8d0c9..8fe331d54 100644 --- a/libkineto/src/RocprofLogger.h +++ b/libkineto/src/RocprofLogger.h @@ -37,6 +37,30 @@ class RocprofLogger { static void pushCorrelationID(uint64_t id, RocLogger::CorrelationDomain type); static void popCorrelationID(RocLogger::CorrelationDomain type); + // Insert a hipEventRecord observation into the global hipEvent_t -> + // sorted vector<{stream, correlationId}> map. Called from the + // hipEventRecord callback to remember the producer for a later + // hipStreamWaitEvent / hipEventSynchronize lookup. Test fixtures may + // also call this directly to seed the map without going through the + // real ROCm tracing path. + static void recordEvent(void* event, void* stream, uint64_t corrId); + + // Look up the most recent hipEventRecord observation for `event` whose + // correlationId is strictly less than `queryCorrId`. On a successful + // lookup, sets *outStream / *outCorrId and returns true. Returns false + // when the event has no record (or no record older than the query). + static bool resolveWait( + void* event, + uint64_t queryCorrId, + void** outStream, + uint64_t* outCorrId); + + // Clears the global hipEvent_t -> {stream, correlationId} map populated by + // hipEventRecord callbacks. Must be called between profiling sessions + // (typically from RocmActivityProfiler::onResetTraceData) to prevent stale + // entries from a previous trace polluting the next one. + static void clearEventMap(); + static void ensureRegistered(); void startLogging(); void stopLogging(); diff --git a/libkineto/test/RocmActivityProfilerTest.cpp b/libkineto/test/RocmActivityProfilerTest.cpp index c8a3975dd..02403cd04 100644 --- a/libkineto/test/RocmActivityProfilerTest.cpp +++ b/libkineto/test/RocmActivityProfilerTest.cpp @@ -310,6 +310,97 @@ struct MockRocLogger { activities_.push_back(row); } +#ifndef ROCTRACER_FALLBACK + // Adds a hipEventRecord activity AND populates RocprofLogger's global + // hipEvent_t -> {stream, corrId} map (just like the real api_callback + // would). New tests should prefer addSyncActivityResolvingFromMap() so + // the JSON output goes through the production lookup path. + void addEventRecordActivity( + int64_t start_ns, + int64_t end_ns, + int64_t correlation, + hipEvent_t event, + hipStream_t stream) { + rocprofEventRecordRow* row = new rocprofEventRecordRow( + correlation, + RUNTIME_DOMAIN, + ROCPROFILER_HIP_RUNTIME_API_ID_hipEventRecord, + processId(), + systemThreadId(), + start_ns, + end_ns, + event, + stream); + activities_.push_back(row); + RocprofLogger::recordEvent( + static_cast(event), + static_cast(stream), + static_cast(correlation)); + } + + // Pre-resolved variant: pass producer (srcStream, srcCorrId) directly. + // Used by tests that already know what the lookup should return. + void addSyncActivity( + int64_t start_ns, + int64_t end_ns, + int64_t correlation, + rocprofSyncType syncType, + hipStream_t stream, + hipEvent_t event, + hipStream_t srcStream, + uint64_t srcCorrId) { + uint32_t apiId = + (syncType == ROCPROF_SYNC_EVENT_SYNCHRONIZE) + ? ROCPROFILER_HIP_RUNTIME_API_ID_hipEventSynchronize + : ROCPROFILER_HIP_RUNTIME_API_ID_hipStreamWaitEvent; + rocprofSyncRow* row = new rocprofSyncRow( + correlation, + RUNTIME_DOMAIN, + apiId, + processId(), + systemThreadId(), + start_ns, + end_ns, + syncType, + stream, + event, + srcStream, + srcCorrId); + activities_.push_back(row); + } + + // Lookup-driven variant: queries RocprofLogger's global event map for + // the producer record matching `event` whose correlationId is < `correlation`, + // matching the production api_callback path. Tests B1 (vector + upper_bound) + // and B3 (event sync resolution) end-to-end through the JSON output. + void addSyncActivityResolvingFromMap( + int64_t start_ns, + int64_t end_ns, + int64_t correlation, + rocprofSyncType syncType, + hipStream_t stream, + hipEvent_t event) { + hipStream_t srcStream = nullptr; + uint64_t srcCorrId = 0; + if (syncType == ROCPROF_SYNC_STREAM_WAIT_EVENT || + syncType == ROCPROF_SYNC_EVENT_SYNCHRONIZE) { + void* resolvedStream = nullptr; + uint64_t resolvedCorrId = 0; + if (RocprofLogger::resolveWait( + static_cast(event), + static_cast(correlation), + &resolvedStream, + &resolvedCorrId)) { + srcStream = static_cast(resolvedStream); + srcCorrId = resolvedCorrId; + } + } + addSyncActivity( + start_ns, end_ns, correlation, syncType, stream, event, srcStream, + srcCorrId); + } +#endif + ~MockRocLogger() { while (!activities_.empty()) { auto act = activities_.back(); @@ -1044,3 +1135,335 @@ TEST_F(RocmActivityProfilerTest, JsonGPUIDSortTest) { } #endif } + +#ifndef ROCTRACER_FALLBACK +TEST_F(RocmActivityProfilerTest, InterStreamDependencyTest) { + std::vector log_modules({"RocmActivityProfiler.cpp"}); + SET_LOG_VERBOSITY_LEVEL(2, log_modules); + + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + profiler.recordThreadInfo(); + + auto cpuOps = std::make_unique( + start_time_ns, start_time_ns + duration_ns); + cpuOps->addOp("op1", start_time_ns + 10, start_time_ns + 30, 1); + profiler.transferCpuTrace(std::move(cpuOps)); + + // Simulate: hipEventRecord(event=0xA, stream=0x1) with corr=10 + // then hipStreamWaitEvent(stream=0x2, event=0xA) with corr=20 + auto gpuOps = std::make_unique(); + hipEvent_t fakeEvent = reinterpret_cast(0xA); + hipStream_t stream0 = reinterpret_cast(0x1); + hipStream_t stream1 = reinterpret_cast(0x2); + + gpuOps->addEventRecordActivity( + start_time_ns + 20, start_time_ns + 25, 10, fakeEvent, stream0); + gpuOps->addSyncActivity( + start_time_ns + 30, start_time_ns + 35, 20, + ROCPROF_SYNC_STREAM_WAIT_EVENT, stream1, fakeEvent, stream0, 10); + gpuOps->addKernelActivity(start_time_ns + 50, start_time_ns + 100, 1); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + + // Verify event record and sync activities are present + int eventRecordCount = 0; + int syncCount = 0; + for (auto& activity : *trace.activities()) { + if (activity->name() == "hipEventRecord") { + eventRecordCount++; + } + if (activity->name() == "hipStreamWaitEvent") { + syncCount++; + } + } + EXPECT_EQ(eventRecordCount, 1); + EXPECT_EQ(syncCount, 1); + + // Verify JSON output contains dependency metadata + char filename[] = "/tmp/libkineto_interstream_testXXXXXX.json"; + { int tmp_fd = mkstemps(filename, 5); + if (tmp_fd >= 0) close(tmp_fd); } + trace.save(filename); + + std::ifstream f(filename); + nlohmann::json j = nlohmann::json::parse(f); + auto& traceEvents = j["traceEvents"]; + + bool foundEventRecord = false; + bool foundSyncWithDep = false; + for (auto& ev : traceEvents) { + if (ev.value("name", "") == "hipEventRecord") { + foundEventRecord = true; + EXPECT_TRUE(ev["args"].contains("hip_event")); + EXPECT_TRUE(ev["args"].contains("hip_stream")); + } + if (ev.value("name", "") == "hipStreamWaitEvent") { + foundSyncWithDep = true; + EXPECT_TRUE(ev["args"].contains("sync_type")); + EXPECT_EQ(ev["args"]["sync_type"], "stream_wait_event"); + EXPECT_TRUE(ev["args"].contains("wait_on_stream")); + EXPECT_TRUE(ev["args"].contains("wait_on_hip_event_record_corr_id")); + EXPECT_EQ(ev["args"]["wait_on_hip_event_record_corr_id"], 10); + } + } + EXPECT_TRUE(foundEventRecord); + EXPECT_TRUE(foundSyncWithDep); + + unlink(filename); +} + +// Drives the production lookup helpers (RocprofLogger::recordEvent / +// resolveWait / clearEventMap) so we exercise the same path used by the +// rocprofiler-sdk api_callback. Verifies B1's vector-backed g_eventMap and +// the upper_bound semantics on event-handle reuse. +TEST_F(RocmActivityProfilerTest, StreamWaitEventFutureCorrelation) { + // Out-of-order delivery: two hipEventRecord callbacks land on the same + // event handle (corr=100 then corr=200), and a hipStreamWaitEvent with + // corr=101 lands AFTER both records. The wait should attribute its + // producer to corr=100 (most recent record < 101), not corr=200. + RocprofLogger::clearEventMap(); + + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 500; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + profiler.recordThreadInfo(); + + auto cpuOps = std::make_unique( + start_time_ns, start_time_ns + duration_ns); + cpuOps->addOp("op1", start_time_ns + 10, start_time_ns + 30, 1); + profiler.transferCpuTrace(std::move(cpuOps)); + + auto gpuOps = std::make_unique(); + hipEvent_t fakeEvent = reinterpret_cast(0xA); + hipStream_t producer1 = reinterpret_cast(0x1); + hipStream_t producer2 = reinterpret_cast(0x2); + hipStream_t consumer = reinterpret_cast(0x3); + + // Two records on the same event handle, corr=100 and corr=200. + gpuOps->addEventRecordActivity( + start_time_ns + 20, start_time_ns + 25, 100, fakeEvent, producer1); + gpuOps->addEventRecordActivity( + start_time_ns + 60, start_time_ns + 65, 200, fakeEvent, producer2); + // Wait at corr=101 must resolve to corr=100, not corr=200. + gpuOps->addSyncActivityResolvingFromMap( + start_time_ns + 40, start_time_ns + 45, 101, + ROCPROF_SYNC_STREAM_WAIT_EVENT, consumer, fakeEvent); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + + char filename[] = "/tmp/libkineto_future_corrXXXXXX.json"; + { int tmp_fd = mkstemps(filename, 5); + if (tmp_fd >= 0) close(tmp_fd); } + trace.save(filename); + + std::ifstream f(filename); + nlohmann::json j = nlohmann::json::parse(f); + bool foundWait = false; + for (auto& ev : j["traceEvents"]) { + if (ev.value("name", "") == "hipStreamWaitEvent" && + ev.contains("args") && + ev["args"].contains("wait_on_hip_event_record_corr_id")) { + foundWait = true; + EXPECT_EQ(ev["args"]["wait_on_hip_event_record_corr_id"], 100) + << "Wait should reference the record before it, not the future one"; + } + } + EXPECT_TRUE(foundWait); + unlink(filename); + + RocprofLogger::clearEventMap(); +} + +// Verifies B1's clear-on-reset behavior: records from a prior profiling +// session must not leak into the next session's wait resolution. +TEST_F(RocmActivityProfilerTest, EventMapClearedOnReset) { + RocprofLogger::clearEventMap(); + + hipEvent_t fakeEvent = reinterpret_cast(0xB); + hipStream_t producer = reinterpret_cast(0x1); + + // Session 1: seed g_eventMap with a record, then call onResetTraceData + // (via profiler.reset()) which should clear it. + { + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(500)); + profiler.recordThreadInfo(); + + auto cpuOps = std::make_unique( + start_time_ns, start_time_ns + 500); + cpuOps->addOp("op1", start_time_ns + 10, start_time_ns + 30, 1); + profiler.transferCpuTrace(std::move(cpuOps)); + + auto gpuOps = std::make_unique(); + gpuOps->addEventRecordActivity( + start_time_ns + 20, start_time_ns + 25, 50, fakeEvent, producer); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + } + + // Session 2: wait on the same event handle. With clearEventMap() called + // on reset, the wait must NOT resolve to session 1's stale record. + void* outStream = nullptr; + uint64_t outCorrId = 0; + EXPECT_FALSE(RocprofLogger::resolveWait( + static_cast(fakeEvent), 999, &outStream, &outCorrId)) + << "Stale record from a prior session should have been cleared"; +} + +// Verifies B3's CUPTI parity: hipEventSynchronize gets producer +// attribution just like hipStreamWaitEvent, and the always-emitted +// wait_on_hip_event_id field is present. +TEST_F(RocmActivityProfilerTest, EventSynchronizeResolvesProducer) { + RocprofLogger::clearEventMap(); + + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + profiler.recordThreadInfo(); + + auto cpuOps = std::make_unique( + start_time_ns, start_time_ns + duration_ns); + cpuOps->addOp("op1", start_time_ns + 10, start_time_ns + 30, 1); + profiler.transferCpuTrace(std::move(cpuOps)); + + auto gpuOps = std::make_unique(); + hipEvent_t fakeEvent = reinterpret_cast(0xC); + hipStream_t producer = reinterpret_cast(0x1); + // hipEventSynchronize is consumer-stream-less; pass nullptr for the + // consumer stream as the real API does. + hipStream_t consumer = nullptr; + + gpuOps->addEventRecordActivity( + start_time_ns + 20, start_time_ns + 25, 30, fakeEvent, producer); + gpuOps->addSyncActivityResolvingFromMap( + start_time_ns + 50, start_time_ns + 60, 40, + ROCPROF_SYNC_EVENT_SYNCHRONIZE, consumer, fakeEvent); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + + char filename[] = "/tmp/libkineto_event_syncXXXXXX.json"; + { int tmp_fd = mkstemps(filename, 5); + if (tmp_fd >= 0) close(tmp_fd); } + trace.save(filename); + + std::ifstream f(filename); + nlohmann::json j = nlohmann::json::parse(f); + bool foundEventSync = false; + for (auto& ev : j["traceEvents"]) { + if (ev.value("name", "") == "hipEventSynchronize" && + ev.contains("args")) { + foundEventSync = true; + EXPECT_EQ(ev["args"]["sync_type"], "event_synchronize"); + // wait_on_hip_event_id is always emitted for event-sync types + EXPECT_TRUE(ev["args"].contains("wait_on_hip_event_id")); + // Producer attribution resolved from g_eventMap + EXPECT_TRUE(ev["args"].contains("wait_on_hip_event_record_corr_id")); + EXPECT_EQ(ev["args"]["wait_on_hip_event_record_corr_id"], 30); + EXPECT_TRUE(ev["args"].contains("wait_on_stream")); + } + } + EXPECT_TRUE(foundEventSync) << "hipEventSynchronize activity not found"; + unlink(filename); + + RocprofLogger::clearEventMap(); +} + +// Verifies wait_on_hip_event_id is emitted even when the producer record is +// absent (e.g., recorded before the trace window). +TEST_F(RocmActivityProfilerTest, UnresolvedWaitStillEmitsEventId) { + RocprofLogger::clearEventMap(); + + RocmActivityProfiler profiler(rocActivities_, /*cpu only*/ false); + int64_t start_time_ns = + libkineto::timeSinceEpoch(std::chrono::system_clock::now()); + int64_t duration_ns = 300; + auto start_time = time_point(nanoseconds(start_time_ns)); + profiler.configure(*cfg_, start_time); + profiler.startTrace(start_time); + profiler.stopTrace(start_time + nanoseconds(duration_ns)); + profiler.recordThreadInfo(); + + auto cpuOps = std::make_unique( + start_time_ns, start_time_ns + duration_ns); + cpuOps->addOp("op1", start_time_ns + 10, start_time_ns + 30, 1); + profiler.transferCpuTrace(std::move(cpuOps)); + + // No addEventRecordActivity: the wait fires on an event that was never + // observed in this trace window. + auto gpuOps = std::make_unique(); + hipEvent_t fakeEvent = reinterpret_cast(0xD); + hipStream_t consumer = reinterpret_cast(0x2); + gpuOps->addSyncActivityResolvingFromMap( + start_time_ns + 50, start_time_ns + 60, 70, + ROCPROF_SYNC_STREAM_WAIT_EVENT, consumer, fakeEvent); + rocActivities_.activityLogger = std::move(gpuOps); + + auto logger = std::make_unique(*cfg_); + profiler.processTrace(*logger); + profiler.reset(); + + ActivityTrace trace(std::move(logger), loggerFactory); + + char filename[] = "/tmp/libkineto_unresolved_waitXXXXXX.json"; + { int tmp_fd = mkstemps(filename, 5); + if (tmp_fd >= 0) close(tmp_fd); } + trace.save(filename); + + std::ifstream f(filename); + nlohmann::json j = nlohmann::json::parse(f); + bool foundUnresolvedWait = false; + for (auto& ev : j["traceEvents"]) { + if (ev.value("name", "") == "hipStreamWaitEvent" && + ev.contains("args")) { + foundUnresolvedWait = true; + // Event id must still be emitted for tooling correlation. + EXPECT_TRUE(ev["args"].contains("wait_on_hip_event_id")); + // Producer attribution must be absent. + EXPECT_FALSE(ev["args"].contains("wait_on_hip_event_record_corr_id")); + EXPECT_FALSE(ev["args"].contains("wait_on_stream")); + } + } + EXPECT_TRUE(foundUnresolvedWait); + unlink(filename); +} +#endif