Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>

#if defined PL_LINUX
#include <ctime>
Expand All @@ -42,7 +44,19 @@ class ThreadInfo
uintptr_t thread_id;
unsigned long native_id;
FrameStack python_stack;
std::vector<std::unique_ptr<StackInfo>> current_tasks;

// current_tasks and task_count_ together form a reusable buffer:
// entries are kept alive between samples so the inner FrameStack capacity
// amortizes. The valid range each sample is [0, task_count_); the
// outer vector grows on demand only when a sample exceeds prior peak.
std::vector<StackInfo> current_tasks;
size_t task_count_ = 0;

// Reusable per-task coroutine stack buffers keyed by Task object. Long-lived
// Tasks keep their FrameStack capacity across samples; stale keys are erased
// on the next task unwind for this thread.
std::unordered_map<PyObject*, FrameStack> task_coro_stacks_;

std::vector<std::unique_ptr<StackInfo>> current_greenlets;

std::string name;
Expand Down
75 changes: 45 additions & 30 deletions ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ ThreadInfo::unwind(EchionSampler& echion, PyThreadState* tstate)
Result<void>
ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
{
// Reset the active range before doing any work so failures cannot render
// stale task stacks from a previous sample. Stack capacity is preserved.
task_count_ = 0;

// The size of the "pure Python" stack (before asyncio Frames).
// Defaults to the full Python stack size (and updated if we find the boundary frame)
size_t upper_python_stack_size = python_stack.size();
Expand Down Expand Up @@ -131,6 +135,13 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)

auto all_tasks = std::move(*maybe_all_tasks);
echion.add_asyncio_task_count(all_tasks.size());

std::unordered_set<PyObject*> all_task_origins;
std::transform(all_tasks.cbegin(),
all_tasks.cend(),
std::inserter(all_task_origins, all_task_origins.begin()),
[](const TaskInfo::Ptr& task) { return task->origin; });

{
auto& previous_task_objects = echion.previous_task_objects();
std::lock_guard<std::mutex> lock(echion.task_link_map_lock());
Expand All @@ -140,12 +151,6 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)

// Clean up the task_link_map. Remove entries associated to tasks that
// no longer exist.
std::unordered_set<PyObject*> all_task_origins;
std::transform(all_tasks.cbegin(),
all_tasks.cend(),
std::inserter(all_task_origins, all_task_origins.begin()),
[](const TaskInfo::Ptr& task) { return task->origin; });

std::vector<PyObject*> to_remove;
for (auto kv : task_link_map) {
if (all_task_origins.find(kv.first) == all_task_origins.end())
Expand All @@ -167,12 +172,6 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)

// Clean up the weak_task_link_map.
// Remove entries associated to tasks that no longer exist.
all_task_origins.clear();
std::transform(all_tasks.cbegin(),
all_tasks.cend(),
std::inserter(all_task_origins, all_task_origins.begin()),
[](const TaskInfo::Ptr& task) { return task->origin; });

to_remove.clear();
for (auto kv : weak_task_link_map) {
if (all_task_origins.find(kv.first) == all_task_origins.end())
Expand All @@ -196,11 +195,9 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
}
}

// Copy all Task object pointers into previous_task_objects
// Copy all Task object pointers into previous_task_objects.
previous_task_objects.clear();
for (const auto& task : all_tasks) {
previous_task_objects.insert(task->origin);
}
previous_task_objects.insert(all_task_origins.cbegin(), all_task_origins.cend());
}

for (auto& task : all_tasks) {
Expand All @@ -216,12 +213,20 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
// Pre-compute per-task coroutine stacks so that each task's coroutine chain is walked exactly once.
// Without this, a parent task's coroutine chain would be walked once for each child task that
// references it in its task chain (e.g. 10 children from asyncio.gather = 10 redundant unwinds
// of the parent's coroutine chain).
std::unordered_map<PyObject*, FrameStack> task_coro_stacks;
// of the parent's coroutine chain). Keep the FrameStack buffers keyed by Task object so long-lived
// tasks reuse their vector capacity across samples.
for (auto it = task_coro_stacks_.begin(); it != task_coro_stacks_.end();) {
if (all_task_origins.find(it->first) == all_task_origins.end()) {
it = task_coro_stacks_.erase(it);
} else {
it->second.clear();
++it;
}
}
for (auto& task : all_tasks) {
FrameStack task_stack;
auto& task_stack = task_coro_stacks_[task->origin];
task_stack.clear();
task->unwind(echion, task_stack, using_uvloop);
task_coro_stacks.emplace(task->origin, std::move(task_stack));
}

// Make sure the on CPU task is first
Expand All @@ -235,8 +240,17 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
}

for (auto& leaf_task : leaf_tasks) {
auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
auto& stack = stack_info->stack;
// Reuse an existing StackInfo slot if available, otherwise grow.
if (task_count_ < current_tasks.size()) {
auto& slot = current_tasks[task_count_];
slot.task_name = leaf_task.get().name;
slot.on_cpu = leaf_task.get().is_on_cpu;
slot.stack.clear();
} else {
current_tasks.emplace_back(leaf_task.get().name, leaf_task.get().is_on_cpu);
}
auto& stack_info = current_tasks[task_count_];
auto& stack = stack_info.stack;

// Safety: prevent infinite loops from cycles in task chain maps
size_t task_chain_depth = 0;
Expand All @@ -254,7 +268,7 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
const FrameStack* task_stack = nullptr;
size_t task_stack_size = 0;
size_t task_frames_to_push = 0;
if (auto it = task_coro_stacks.find(task.origin); it != task_coro_stacks.end()) {
if (auto it = task_coro_stacks_.find(task.origin); it != task_coro_stacks_.end()) {
task_stack = &it->second;
task_stack_size = task_stack->size();
if (stack.size() < max_frames) {
Expand Down Expand Up @@ -343,7 +357,7 @@ ThreadInfo::unwind_tasks(EchionSampler& echion, PyThreadState* tstate)
stack.push_back(python_frame);
}

current_tasks.push_back(std::move(stack_info));
++task_count_;
}

return Result<void>::ok();
Expand Down Expand Up @@ -728,17 +742,18 @@ ThreadInfo::sample(EchionSampler& echion, PyThreadState* tstate, microsecond_t d
// 1. asyncio Tasks stacks (if any)
// 2. Greenlets stacks (if any)
// 3. The normal thread stack (if no asyncio tasks or greenlets)
if (!current_tasks.empty()) {
for (auto& task_stack_info : current_tasks) {
task_stack_info->task_name.visit_string(
[&](std::string_view task_name) { renderer.render_task_begin(task_name, task_stack_info->on_cpu); });
if (task_count_ > 0) {
for (size_t i = 0; i < task_count_; i++) {
auto& task_stack_info = current_tasks[i];
task_stack_info.task_name.visit_string(
[&](std::string_view task_name) { renderer.render_task_begin(task_name, task_stack_info.on_cpu); });

task_stack_info->stack.render(echion);
task_stack_info.stack.render(echion);

renderer.render_stack_end();
}

current_tasks.clear();
task_count_ = 0;
} else if (!current_greenlets.empty()) {
for (auto& greenlet_stack : current_greenlets) {
greenlet_stack->task_name.visit_string(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
profiling: Further reduces native heap memory used by the stack collector
in asyncio workloads with many tracked tasks.
Loading