diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h index 82fa33edabf..ff91666051f 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h @@ -15,8 +15,10 @@ #include #include +#include #include #include +#include #if defined PL_LINUX #include @@ -42,7 +44,19 @@ class ThreadInfo uintptr_t thread_id; unsigned long native_id; FrameStack python_stack; - std::vector> current_tasks; + + // current_tasks and task_count_ together form a reusable buffer: + // entries are kept alive between samples so the inner FrameStack capacity + // amortizes. The valid range each sample is [0, task_count_); the + // outer vector grows on demand only when a sample exceeds prior peak. + std::vector current_tasks; + size_t task_count_ = 0; + + // Reusable per-task coroutine stack buffers keyed by Task object. Long-lived + // Tasks keep their FrameStack capacity across samples; stale keys are erased + // on the next task unwind for this thread. + std::unordered_map task_coro_stacks_; + std::vector> current_greenlets; std::string name; diff --git a/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc b/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc index f2078c066c0..aea047dd5d2 100644 --- a/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc +++ b/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc @@ -29,6 +29,10 @@ ThreadInfo::unwind(EchionSampler& echion, PyThreadState* tstate) Result ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) { + // Reset the active range before doing any work so failures cannot render + // stale task stacks from a previous sample. Stack capacity is preserved. + task_count_ = 0; + // The size of the "pure Python" stack (before asyncio Frames). // Defaults to the full Python stack size (and updated if we find the boundary frame) size_t upper_python_stack_size = python_stack.size(); @@ -131,6 +135,13 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) auto all_tasks = std::move(*maybe_all_tasks); echion.add_asyncio_task_count(all_tasks.size()); + + std::unordered_set all_task_origins; + std::transform(all_tasks.cbegin(), + all_tasks.cend(), + std::inserter(all_task_origins, all_task_origins.begin()), + [](const TaskInfo::Ptr& task) { return task->origin; }); + { auto& previous_task_objects = echion.previous_task_objects(); std::lock_guard lock(echion.task_link_map_lock()); @@ -140,12 +151,6 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) // Clean up the task_link_map. Remove entries associated to tasks that // no longer exist. - std::unordered_set all_task_origins; - std::transform(all_tasks.cbegin(), - all_tasks.cend(), - std::inserter(all_task_origins, all_task_origins.begin()), - [](const TaskInfo::Ptr& task) { return task->origin; }); - std::vector to_remove; for (auto kv : task_link_map) { if (all_task_origins.find(kv.first) == all_task_origins.end()) @@ -167,12 +172,6 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) // Clean up the weak_task_link_map. // Remove entries associated to tasks that no longer exist. - all_task_origins.clear(); - std::transform(all_tasks.cbegin(), - all_tasks.cend(), - std::inserter(all_task_origins, all_task_origins.begin()), - [](const TaskInfo::Ptr& task) { return task->origin; }); - to_remove.clear(); for (auto kv : weak_task_link_map) { if (all_task_origins.find(kv.first) == all_task_origins.end()) @@ -196,11 +195,9 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) } } - // Copy all Task object pointers into previous_task_objects + // Copy all Task object pointers into previous_task_objects. previous_task_objects.clear(); - for (const auto& task : all_tasks) { - previous_task_objects.insert(task->origin); - } + previous_task_objects.insert(all_task_origins.cbegin(), all_task_origins.cend()); } for (auto& task : all_tasks) { @@ -216,12 +213,20 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) // Pre-compute per-task coroutine stacks so that each task's coroutine chain is walked exactly once. // Without this, a parent task's coroutine chain would be walked once for each child task that // references it in its task chain (e.g. 10 children from asyncio.gather = 10 redundant unwinds - // of the parent's coroutine chain). - std::unordered_map task_coro_stacks; + // of the parent's coroutine chain). Keep the FrameStack buffers keyed by Task object so long-lived + // tasks reuse their vector capacity across samples. + for (auto it = task_coro_stacks_.begin(); it != task_coro_stacks_.end();) { + if (all_task_origins.find(it->first) == all_task_origins.end()) { + it = task_coro_stacks_.erase(it); + } else { + it->second.clear(); + ++it; + } + } for (auto& task : all_tasks) { - FrameStack task_stack; + auto& task_stack = task_coro_stacks_[task->origin]; + task_stack.clear(); task->unwind(echion, task_stack, using_uvloop); - task_coro_stacks.emplace(task->origin, std::move(task_stack)); } // Make sure the on CPU task is first @@ -235,8 +240,17 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) } for (auto& leaf_task : leaf_tasks) { - auto stack_info = std::make_unique(leaf_task.get().name, leaf_task.get().is_on_cpu); - auto& stack = stack_info->stack; + // Reuse an existing StackInfo slot if available, otherwise grow. + if (task_count_ < current_tasks.size()) { + auto& slot = current_tasks[task_count_]; + slot.task_name = leaf_task.get().name; + slot.on_cpu = leaf_task.get().is_on_cpu; + slot.stack.clear(); + } else { + current_tasks.emplace_back(leaf_task.get().name, leaf_task.get().is_on_cpu); + } + auto& stack_info = current_tasks[task_count_]; + auto& stack = stack_info.stack; // Safety: prevent infinite loops from cycles in task chain maps size_t task_chain_depth = 0; @@ -254,7 +268,7 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) const FrameStack* task_stack = nullptr; size_t task_stack_size = 0; size_t task_frames_to_push = 0; - if (auto it = task_coro_stacks.find(task.origin); it != task_coro_stacks.end()) { + if (auto it = task_coro_stacks_.find(task.origin); it != task_coro_stacks_.end()) { task_stack = &it->second; task_stack_size = task_stack->size(); if (stack.size() < max_frames) { @@ -343,7 +357,7 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate) stack.push_back(python_frame); } - current_tasks.push_back(std::move(stack_info)); + ++task_count_; } return Result::ok(); @@ -728,17 +742,18 @@ ThreadInfo::sample(EchionSampler& echion, PyThreadState* tstate, microsecond_t d // 1. asyncio Tasks stacks (if any) // 2. Greenlets stacks (if any) // 3. The normal thread stack (if no asyncio tasks or greenlets) - if (!current_tasks.empty()) { - for (auto& task_stack_info : current_tasks) { - task_stack_info->task_name.visit_string( - [&](std::string_view task_name) { renderer.render_task_begin(task_name, task_stack_info->on_cpu); }); + if (task_count_ > 0) { + for (size_t i = 0; i < task_count_; i++) { + auto& task_stack_info = current_tasks[i]; + task_stack_info.task_name.visit_string( + [&](std::string_view task_name) { renderer.render_task_begin(task_name, task_stack_info.on_cpu); }); - task_stack_info->stack.render(echion); + task_stack_info.stack.render(echion); renderer.render_stack_end(); } - current_tasks.clear(); + task_count_ = 0; } else if (!current_greenlets.empty()) { for (auto& greenlet_stack : current_greenlets) { greenlet_stack->task_name.visit_string( diff --git a/releasenotes/notes/perf-profiling-unwind-asyncio-tasks-buffer-reuse-4f6e2a8b9c1d0e3f.yaml b/releasenotes/notes/perf-profiling-unwind-asyncio-tasks-buffer-reuse-4f6e2a8b9c1d0e3f.yaml new file mode 100644 index 00000000000..823c125228a --- /dev/null +++ b/releasenotes/notes/perf-profiling-unwind-asyncio-tasks-buffer-reuse-4f6e2a8b9c1d0e3f.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + profiling: Further reduces native heap memory used by the stack collector + in asyncio workloads with many tracked tasks.