diff --git a/nettacker/core/utils/common.py b/nettacker/core/utils/common.py index 5e616ae0a..c5384b41d 100644 --- a/nettacker/core/utils/common.py +++ b/nettacker/core/utils/common.py @@ -31,7 +31,19 @@ def replace_dependent_response(log, response_dependent): return log -def merge_logs_to_list(result, log_list=[]): +def merge_logs_to_list(result, log_list=None): + """Recursively extract all 'log' values from a nested dict into a flat deduplicated list. + + Args: + result: A dict (possibly nested) containing 'log' keys to extract. + log_list: Accumulator list for recursive calls. Defaults to a new empty list + on each top-level call to avoid mutable default argument pitfalls. + + Returns: + A deduplicated list of extracted log values. + """ + if log_list is None: + log_list = [] if isinstance(result, dict): if "json_event" in list(result.keys()): if not isinstance(result["json_event"], dict): diff --git a/tests/core/utils/test_common.py b/tests/core/utils/test_common.py index 5e4ced398..1185094b3 100644 --- a/tests/core/utils/test_common.py +++ b/tests/core/utils/test_common.py @@ -96,6 +96,43 @@ def test_select_maximum_cpu_core(cpu_count_mock): assert common_utils.select_maximum_cpu_core("invalid") == 1 +def test_merge_logs_to_list_simple(): + result = {"log": "error occurred"} + assert common_utils.merge_logs_to_list(result) == ["error occurred"] + + +def test_merge_logs_to_list_nested(): + result = { + "log": "outer", + "nested": {"log": "inner"}, + } + logs = common_utils.merge_logs_to_list(result) + assert sorted(logs) == ["inner", "outer"] + + +def test_merge_logs_to_list_no_log_key(): + result = {"status": "ok", "data": {"value": 42}} + assert common_utils.merge_logs_to_list(result) == [] + + +def test_merge_logs_to_list_deduplicates(): + result = { + "log": "same", + "nested": {"log": "same"}, + } + assert common_utils.merge_logs_to_list(result) == ["same"] + + +def test_merge_logs_to_list_no_shared_state_between_calls(): + """Verify that consecutive calls without explicit log_list don't leak state.""" + result_a = {"log": "first"} + result_b = {"log": "second"} + logs_a = common_utils.merge_logs_to_list(result_a) + logs_b = common_utils.merge_logs_to_list(result_b) + assert logs_a == ["first"] + assert logs_b == ["second"] + + def test_wait_for_threads_to_finish_all_dead(): """All threads already finished -- should return True immediately.""" t = MagicMock(spec=threading.Thread)