diff --git a/ddtrace/internal/settings/_supported_configurations.py b/ddtrace/internal/settings/_supported_configurations.py index c91ef3f54ab..4f9d6e402f9 100644 --- a/ddtrace/internal/settings/_supported_configurations.py +++ b/ddtrace/internal/settings/_supported_configurations.py @@ -357,6 +357,7 @@ "DD_PROFILING_EXCEPTION_ENABLED", "DD_PROFILING_EXCEPTION_SAMPLING_INTERVAL", "DD_PROFILING_FILE_PATH", + "DD_PROFILING_GC_ENABLED", "DD_PROFILING_HEAP_ENABLED", "DD_PROFILING_HEAP_SAMPLE_SIZE", "DD_PROFILING_IGNORE_PROFILER", diff --git a/ddtrace/internal/settings/profiling.py b/ddtrace/internal/settings/profiling.py index 7f9b7b9b90d..fe1546d2d6d 100644 --- a/ddtrace/internal/settings/profiling.py +++ b/ddtrace/internal/settings/profiling.py @@ -40,7 +40,7 @@ def _derive_default_heap_sample_size( try: from ddtrace.vendor import psutil - total_mem = psutil.swap_memory().total + psutil.virtual_memory().total + total_mem = psutil.swap_memory().total + psutil.virtual_memory().total # type: ignore[no-untyped-call] except Exception: logger.warning( "Unable to get total memory available, using default value of %d KB", @@ -496,6 +496,18 @@ class ProfilingConfigPytorch(DDConfig): ) +class ProfilingConfigGC(DDConfig): + __item__ = __prefix__ = "gc" + + enabled = DDConfig.v( + bool, + "enabled", + default=True, + help_type="Boolean", + help="Whether to enable the GC collector (pause durations, collection counts).", + ) + + class ProfilingConfigException(DDConfig): __item__ = __prefix__ = "exception" @@ -534,6 +546,7 @@ class ProfilingConfigException(DDConfig): ProfilingConfig.include(ProfilingConfigLock, namespace="lock") ProfilingConfig.include(ProfilingConfigMemory, namespace="memory") ProfilingConfig.include(ProfilingConfigHeap, namespace="heap") +ProfilingConfig.include(ProfilingConfigGC, namespace="gc") ProfilingConfig.include(ProfilingConfigPytorch, namespace="pytorch") ProfilingConfig.include(ProfilingConfigException, namespace="exception") @@ -584,6 +597,8 @@ def config_str(config: ProfilingConfig) -> str: configured_features.append("mem") if config.heap.sample_size > 0: configured_features.append("heap") + if config.gc.enabled: + configured_features.append("gc") if config.pytorch.enabled: configured_features.append("pytorch") if config.exception.enabled: diff --git a/ddtrace/profiling/collector/gc.py b/ddtrace/profiling/collector/gc.py new file mode 100644 index 00000000000..1c3ee4eab15 --- /dev/null +++ b/ddtrace/profiling/collector/gc.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import gc +import logging +import threading +import time +from typing import Callable + +from ddtrace.internal.datadog.profiling import ddup +from ddtrace.profiling import collector + + +LOG = logging.getLogger(__name__) + +_GEN_NAMES: tuple[str, ...] = ( + "gc.collect[gen=0]", + "gc.collect[gen=1]", + "gc.collect[gen=2]", +) + + +class GCCollector(collector.Collector): + """Collect GC pause durations, collection counts, and configuration state. + + Hooks gc.callbacks for per-collection events and emits a snapshot sample + once per profile flush interval with cumulative counts and config state. + + Data emitted: + - Wall time samples (push_walltime) attributed to synthetic gc.collect[gen=N] + frames — appear in the Wall Time profile view. + - Alloc samples (push_alloc) with collected-object count — appear in the + Alloc profile view under the same frames. + - A gc.config snapshot sample per flush carrying explicit gc.collect() call + count in the sample count field. + """ + + def _start_service(self) -> None: + self._start_ns: dict[int, int] = {} + self._explicit_count: int = 0 + self._count_lock = threading.Lock() + self._orig_collect: Callable[..., int] = gc.collect + gc.collect = self._patched_collect + gc.callbacks.append(self._on_gc) + LOG.debug("GCCollector started") + + def _stop_service(self) -> None: + try: + gc.callbacks.remove(self._on_gc) + except ValueError: + pass + gc.collect = self._orig_collect + LOG.debug("GCCollector stopped") + + def _patched_collect(self, generation: int = 2) -> int: + with self._count_lock: + self._explicit_count += 1 + return self._orig_collect(generation) + + def _on_gc(self, phase: str, info: dict[str, int]) -> None: + gen = info.get("generation", 0) + if phase == "start": + self._start_ns[gen] = time.monotonic_ns() + elif phase == "stop": + start = self._start_ns.pop(gen, None) + if start is None: + return + pause_ns = time.monotonic_ns() - start + frame_name = _GEN_NAMES[gen] if gen < len(_GEN_NAMES) else "gc.collect" + + handle = ddup.SampleHandle() + handle.push_walltime(pause_ns, 1) + handle.push_frame(frame_name, "gc", 0, 0) + handle.push_monotonic_ns(time.monotonic_ns()) + handle.flush_sample() + + collected = info.get("collected", 0) + if collected > 0: + handle2 = ddup.SampleHandle() + handle2.push_alloc(collected, 1) + handle2.push_frame(frame_name, "gc", 0, 0) + handle2.push_monotonic_ns(time.monotonic_ns()) + handle2.flush_sample() + + def snapshot(self) -> None: # type: ignore[override] + with self._count_lock: + explicit = self._explicit_count + self._explicit_count = 0 + + handle = ddup.SampleHandle() + # Use count field to carry explicit gc.collect() tally for this interval. + # A zero walltime with count > 0 is the established pattern for pure-count + # samples (same as lock release-time samples with zero duration). + handle.push_walltime(0, explicit) + handle.push_frame("gc.config", "gc", 0, 0) + handle.push_monotonic_ns(time.monotonic_ns()) + handle.flush_sample() + + if LOG.isEnabledFor(logging.DEBUG): + thresholds = gc.get_threshold() + enabled = gc.isenabled() + freeze_count = gc.get_freeze_count() if hasattr(gc, "get_freeze_count") else 0 + stats = gc.get_stats() + total_collections = sum(s.get("collections", 0) for s in stats) + LOG.debug( + "GCCollector snapshot: enabled=%s thresholds=%s freeze=%d total_collections=%d explicit_collect=%d", + enabled, + thresholds, + freeze_count, + total_collections, + explicit, + ) diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index e334be5896f..dcf79998b06 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -26,6 +26,7 @@ from ddtrace.profiling import scheduler from ddtrace.profiling.collector import asyncio from ddtrace.profiling.collector import exception +from ddtrace.profiling.collector import gc as gc_collector from ddtrace.profiling.collector import memalloc from ddtrace.profiling.collector import pytorch from ddtrace.profiling.collector import stack @@ -175,6 +176,7 @@ def __init__( _memory_collector_enabled: bool = profiling_config.memory.enabled, _stack_collector_enabled: bool = profiling_config.stack.enabled, _lock_collector_enabled: bool = profiling_config.lock.enabled, + _gc_collector_enabled: bool = profiling_config.gc.enabled, _pytorch_collector_enabled: bool = profiling_config.pytorch.enabled, _exception_profiling_enabled: bool = profiling_config.exception.enabled, enable_code_provenance: bool = profiling_config.code_provenance, @@ -191,6 +193,7 @@ def __init__( self._memory_collector_enabled: bool = _memory_collector_enabled self._stack_collector_enabled: bool = _stack_collector_enabled self._lock_collector_enabled: bool = _lock_collector_enabled + self._gc_collector_enabled: bool = _gc_collector_enabled self._pytorch_collector_enabled: bool = _pytorch_collector_enabled self._exception_profiling_enabled: bool = _exception_profiling_enabled self.enable_code_provenance: bool = enable_code_provenance @@ -266,6 +269,14 @@ def __post_init__(self) -> None: except Exception: LOG.error("Failed to start exception collector, disabling.", exc_info=True) + if self._gc_collector_enabled: + LOG.debug("Profiling collector (gc) enabled") + try: + self._collectors.append(gc_collector.GCCollector()) + LOG.debug("Profiling collector (gc) initialized") + except Exception: + LOG.error("Failed to start gc collector, disabling.", exc_info=True) + if self._stack_collector_enabled: LOG.debug("Profiling collector (stack) enabled") try: diff --git a/supported-configurations.json b/supported-configurations.json index aca601390fb..b34c169a5d5 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -2710,6 +2710,13 @@ "default": null } ], + "DD_PROFILING_GC_ENABLED": [ + { + "implementation": "C", + "type": "boolean", + "default": "true" + } + ], "DD_PROFILING_HEAP_ENABLED": [ { "implementation": "C", diff --git a/tests/profiling/collector/test_collector.py b/tests/profiling/collector/test_collector.py index f52970085d2..a8194a33a83 100644 --- a/tests/profiling/collector/test_collector.py +++ b/tests/profiling/collector/test_collector.py @@ -67,3 +67,15 @@ def test_capture_sampler_pure_python_fallback() -> None: sys.modules[mod_name] = saved_module sys.modules.pop(collector_mod, None) importlib.import_module(collector_mod) + # Re-attach any already-imported direct submodules as attributes of the + # freshly-reimported package. Python only does this automatically on a + # fresh import of the submodule itself; it does not update attributes on + # a parent package object that was replaced in sys.modules mid-flight. + _parent = sys.modules[collector_mod] + _prefix = collector_mod + "." + _depth = collector_mod.count(".") + 1 + for _key in list(sys.modules): + if _key.startswith(_prefix) and _key.count(".") == _depth: + _attr = _key.rsplit(".", 1)[-1] + if not hasattr(_parent, _attr): + setattr(_parent, _attr, sys.modules[_key]) diff --git a/tests/profiling/collector/test_gc.py b/tests/profiling/collector/test_gc.py new file mode 100644 index 00000000000..4475dda1fe3 --- /dev/null +++ b/tests/profiling/collector/test_gc.py @@ -0,0 +1,332 @@ +"""Tests for the GC observability collector.""" + +from __future__ import annotations + +import gc +import os +from pathlib import Path +from unittest import mock + +import pytest + +from ddtrace.internal.datadog.profiling import ddup +import ddtrace.profiling.collector.gc as _gc_module +from ddtrace.profiling.collector.gc import GCCollector +from tests.profiling.collector import pprof_utils + + +def _setup_profiler(tmp_path: Path, test_name: str) -> str: + pprof_prefix = str(tmp_path / test_name) + output_filename = pprof_prefix + "." + str(os.getpid()) + assert ddup.is_available + ddup.config(env="test", service=test_name, version="1.0", output_filename=pprof_prefix) + ddup.start() + return output_filename + + +# --------------------------------------------------------------------------- +# Unit tests — no ddup required +# --------------------------------------------------------------------------- + + +def test_gc_callbacks_registered() -> None: + col = GCCollector() + assert col._on_gc not in gc.callbacks + with mock.patch.object(_gc_module, "ddup"): + col.start() + assert col._on_gc in gc.callbacks + col.stop() + assert col._on_gc not in gc.callbacks + + +def test_gc_collect_not_patched_after_stop() -> None: + orig = gc.collect + col = GCCollector() + with mock.patch.object(_gc_module, "ddup"): + col.start() + assert gc.collect is not orig + col.stop() + assert gc.collect is orig + + +def test_explicit_count_increments() -> None: + col = GCCollector() + with mock.patch.object(_gc_module, "ddup"): + col.start() + try: + assert col._explicit_count == 0 + gc.collect() + assert col._explicit_count == 1 + gc.collect(0) + assert col._explicit_count == 2 + finally: + col.stop() + + +def test_explicit_count_resets_on_snapshot() -> None: + col = GCCollector() + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + col.start() + try: + gc.collect() + gc.collect() + assert col._explicit_count == 2 + mock_handle = mock.MagicMock() + mock_ddup.SampleHandle.return_value = mock_handle + col.snapshot() + assert col._explicit_count == 0 + finally: + col.stop() + + +def _make_isolated_collector() -> GCCollector: + """Create a GCCollector with internal state initialized but NOT registered + in gc.callbacks. Use for unit tests that call _on_gc directly to avoid + interference from real background GC events. + """ + col = GCCollector() + col._start_ns = {} + col._explicit_count = 0 + col._count_lock = __import__("threading").Lock() + return col + + +def test_on_gc_records_pause_walltime() -> None: + col = _make_isolated_collector() + handles: list[mock.MagicMock] = [] + + def make_handle() -> mock.MagicMock: + h = mock.MagicMock() + handles.append(h) + return h + + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + mock_ddup.SampleHandle.side_effect = make_handle + col._on_gc("start", {"generation": 0}) + col._on_gc("stop", {"generation": 0, "collected": 5, "uncollectable": 0}) + + assert len(handles) == 2 + pause_handle = handles[0] + pause_handle.push_walltime.assert_called_once() + args = pause_handle.push_walltime.call_args[0] + pause_ns, count = args + assert pause_ns >= 0 + assert count == 1 + pause_handle.push_frame.assert_called_once_with("gc.collect[gen=0]", "gc", 0, 0) + pause_handle.flush_sample.assert_called_once() + + +def test_on_gc_emits_alloc_sample_for_collected_objects() -> None: + col = _make_isolated_collector() + handles: list[mock.MagicMock] = [] + + def make_handle() -> mock.MagicMock: + h = mock.MagicMock() + handles.append(h) + return h + + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + mock_ddup.SampleHandle.side_effect = make_handle + col._on_gc("start", {"generation": 1}) + col._on_gc("stop", {"generation": 1, "collected": 10, "uncollectable": 0}) + + # First handle: walltime; second handle: alloc for collected objects + assert len(handles) == 2 + alloc_handle = handles[1] + alloc_handle.push_alloc.assert_called_once_with(10, 1) + alloc_handle.push_frame.assert_called_once_with("gc.collect[gen=1]", "gc", 0, 0) + alloc_handle.flush_sample.assert_called_once() + + +def test_on_gc_no_alloc_sample_when_zero_collected() -> None: + col = _make_isolated_collector() + handles: list[mock.MagicMock] = [] + + def make_handle() -> mock.MagicMock: + h = mock.MagicMock() + handles.append(h) + return h + + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + mock_ddup.SampleHandle.side_effect = make_handle + col._on_gc("start", {"generation": 0}) + col._on_gc("stop", {"generation": 0, "collected": 0, "uncollectable": 0}) + + assert len(handles) == 1 + + +def test_on_gc_stop_without_start_is_noop() -> None: + col = _make_isolated_collector() + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + col._on_gc("stop", {"generation": 2, "collected": 3, "uncollectable": 0}) + mock_ddup.SampleHandle.assert_not_called() + + +def test_snapshot_emits_config_sample() -> None: + col = GCCollector() + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + col.start() + try: + gc.collect() + gc.collect() + mock_ddup.reset_mock() + mock_handle = mock.MagicMock() + mock_ddup.SampleHandle.return_value = mock_handle + col.snapshot() + finally: + col.stop() + mock_handle.push_walltime.assert_called_once_with(0, 2) + mock_handle.push_frame.assert_called_once_with("gc.config", "gc", 0, 0) + mock_handle.flush_sample.assert_called_once() + + +def test_snapshot_zero_explicit_count() -> None: + col = GCCollector() + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + col.start() + try: + mock_handle = mock.MagicMock() + mock_ddup.SampleHandle.return_value = mock_handle + col.snapshot() + finally: + col.stop() + mock_handle.push_walltime.assert_called_once_with(0, 0) + mock_handle.flush_sample.assert_called_once() + + +def test_on_gc_frame_names_per_generation() -> None: + col = _make_isolated_collector() + frames: list[tuple[str, ...]] = [] + + def make_handle() -> mock.MagicMock: + h = mock.MagicMock() + h.push_frame.side_effect = lambda *args: frames.append(args) + return h + + with mock.patch.object(_gc_module, "ddup") as mock_ddup: + mock_ddup.SampleHandle.side_effect = make_handle + for gen in range(3): + col._on_gc("start", {"generation": gen}) + col._on_gc("stop", {"generation": gen, "collected": 0, "uncollectable": 0}) + + expected = ["gc.collect[gen=0]", "gc.collect[gen=1]", "gc.collect[gen=2]"] + actual = [f[0] for f in frames] + assert actual == expected + + +# --------------------------------------------------------------------------- +# Integration tests — emit real ddup samples and read back pprof +# --------------------------------------------------------------------------- + + +def test_gc_pause_samples_appear_in_profile(tmp_path: Path) -> None: + output_filename = _setup_profiler(tmp_path, "test_gc_pause_samples_appear_in_profile") + + col = GCCollector() + col.start() + try: + gc.collect(0) + gc.collect(1) + gc.collect(2) + finally: + col.stop() + + ddup.upload() + + profile = pprof_utils.parse_newest_profile(output_filename) + wall_time_samples = pprof_utils.get_samples_with_value_type(profile, "wall-time") + + gc_samples = [ + s + for s in wall_time_samples + if any( + "gc.collect" in pprof_utils.get_location_from_id(profile, loc_id).function_name for loc_id in s.location_id + ) + ] + assert len(gc_samples) > 0, "Expected at least one gc.collect wall-time sample" + + +def test_gc_alloc_samples_appear_in_profile(tmp_path: Path) -> None: + output_filename = _setup_profiler(tmp_path, "test_gc_alloc") + + col = GCCollector() + col.start() + try: + for _ in range(10): + a: dict[str, object] = {} + b: dict[str, object] = {} + a["b"] = b + b["a"] = a + del a, b + gc.collect(0) + finally: + col.stop() + + ddup.upload() + + profile = pprof_utils.parse_newest_profile(output_filename) + alloc_samples = pprof_utils.get_samples_with_value_type(profile, "alloc-space") + gc_alloc = [ + s + for s in alloc_samples + if any( + "gc.collect" in pprof_utils.get_location_from_id(profile, loc_id).function_name for loc_id in s.location_id + ) + ] + assert len(gc_alloc) > 0, "Expected at least one gc.collect alloc sample" + + +def test_gc_snapshot_sample_appears_in_profile(tmp_path: Path) -> None: + output_filename = _setup_profiler(tmp_path, "test_gc_snapshot") + + col = GCCollector() + col.start() + try: + gc.collect() + gc.collect() + col.snapshot() + finally: + col.stop() + + ddup.upload() + + profile = pprof_utils.parse_newest_profile(output_filename) + config_samples = [ + s + for s in profile.sample + if any( + "gc.config" in pprof_utils.get_location_from_id(profile, loc_id).function_name for loc_id in s.location_id + ) + ] + assert len(config_samples) > 0, "Expected gc.config snapshot sample" + + +# --------------------------------------------------------------------------- +# Profiler wiring tests +# --------------------------------------------------------------------------- + + +@pytest.mark.subprocess(env=dict(DD_PROFILING_GC_ENABLED="true")) +def test_gc_collector_in_profiler_when_enabled(): + from ddtrace.profiling import profiler + from ddtrace.profiling.collector.gc import GCCollector + + assert any(isinstance(col, GCCollector) for col in profiler.Profiler()._profiler._collectors) + + +@pytest.mark.subprocess(env=dict(DD_PROFILING_GC_ENABLED="false")) +def test_gc_collector_not_in_profiler_when_disabled(): + from ddtrace.profiling import profiler + from ddtrace.profiling.collector.gc import GCCollector + + assert all(not isinstance(col, GCCollector) for col in profiler.Profiler()._profiler._collectors) + + +def test_gc_collector_disabled_by_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DD_PROFILING_GC_ENABLED", "false") + import importlib + + import ddtrace.internal.settings.profiling as prof_settings + + importlib.reload(prof_settings) + assert not prof_settings.config.gc.enabled