From 43ceea28405298881e62ee0a2e1574a4502d1e7f Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 22 Oct 2025 10:45:23 +0200 Subject: [PATCH 1/5] Add support for picking up the generation of various output file patterns from the log output Signed-off-by: Adam.Dybbroe --- pytroll_runner/__init__.py | 20 ++++- pytroll_runner/tests/test_prepare_messages.py | 73 +++++++++++++++++-- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/pytroll_runner/__init__.py b/pytroll_runner/__init__.py index b48e0ac..2df0bec 100644 --- a/pytroll_runner/__init__.py +++ b/pytroll_runner/__init__.py @@ -212,18 +212,30 @@ def generate_message_from_log_output(publisher_config, mda, log_output): return message +def _get_nefiles_from_regex(regex, log_output): + """Get list of new output files from log messages.""" + logger.debug(f"Matching regex-pattern: {regex} from log output") + nfiles = re.findall(regex, str(log_output, "utf-8")) + logger.debug(f"Output files identified from log output: {nfiles}") + return nfiles + + def get_newfiles_from_regex_and_logoutput(regex, log_output): """Get the filenames using a regex-pattern on the log_output.""" - logger.debug(f"Matching regex-pattern: {regex} from log output") - new_files = re.findall(regex, str(log_output, "utf-8")) - logger.debug(f"Output files identified from log output: {new_files}") + if isinstance(regex, list): + new_files = [] + for rex in regex: + nfiles = _get_nefiles_from_regex(rex, log_output) + new_files = new_files + nfiles + else: + new_files = _get_nefiles_from_regex(regex, log_output) + return new_files def generate_message_from_expected_files(pub_config, extra_metadata=None, preexisting_files=None): """Generate a message containing the expected files.""" new_files = find_new_files(pub_config, preexisting_files or set()) - return generate_message_from_new_files(pub_config, new_files, extra_metadata) diff --git a/pytroll_runner/tests/test_prepare_messages.py b/pytroll_runner/tests/test_prepare_messages.py index 4b371bd..c5ccc60 100644 --- a/pytroll_runner/tests/test_prepare_messages.py +++ b/pytroll_runner/tests/test_prepare_messages.py @@ -4,7 +4,7 @@ from pytroll_runner import get_newfiles_from_regex_and_logoutput, read_config -TEST_YAML_CONFIG1 = """script: /bin/awsat_l0l1b_run.sh +TEST_YAML_CONFIG_ONE_OUTPUT_FILE = """script: /bin/awsat_l0l1b_run.sh publisher_config: output_files_log_regex: "renamed .* -> '(.*.nc)" publisher_settings: @@ -24,6 +24,29 @@ addr_listener: true """ +TEST_YAML_CONFIG_MANY_OUTPUT_FILES = """ +script: + command: "/san1/opt/pps_mw_aws_runner/releases/pps_mw_aws_runner-0.0.10/bin/pps_mw.sh -p pr_hl -s aws -r W_.*.nc " + workers: 1 +publisher_config: + output_files_log_regex: + - "Has written level2 file: (.*.nc)" + - "Has saved plot file: (.*euro4.png)" + - "Has saved plot file: (.*baltrad4.png)" + publisher_settings: + name: pps_mw_aws_runner + static_metadata: + data_processing_level: 2 + type: NC + topic: /PPS-MW-NC/2/ +subscriber_config: + topics: + - /awsat/l1b/metno/oslo + - /awsat/l1b/fmi/sodankyla + nameserver: localhost + addr_listener: true +""" + EXAMPLE_LOG_OUTPUT_BYTES = (b"""\nGenerating configuration..\n\nSetting SELinux context on...\n\nFound result file:""" b"""\n\nrenamed '/san1/polar_in/direct_readout/aws/L1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-""" b"""MWR-1B-RAD_C_SMHI_20250602201120_L_D_20250602033328_20250602033625_C_N____.nc' -> '""" @@ -33,21 +56,61 @@ EXPECTED_LIST = ["/san1/polar_in/direct_readout/aws/lvl1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_C_SMHI_20250602201" "120_L_D_20250602033328_20250602033625_C_N____.nc"] +LOG_OUTPUT_SEVERAL_FILES = (b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Start writing level2 dataset.""" + b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Has written level2 file: """ + b"""/san1/polar_out/direct_readout/lvl2/""" + b"""S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc""" + b"""\n[INFO: 2025-10-21 11:27:51 : pps_mw.utils.plotting] Has saved plot file: """ + b"""/san1/polar_out/direct_readout/lvl2/""" + b"""quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png""" + b"""\n[INFO: 2025-10-21 11:27:52 : pps_mw.utils.plotting] Has saved plot file: """ + b"""/san1/polar_out/direct_readout/lvl2/""" + b"""quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png""" + b"""\n[INFO: 2025-10-21 11:27:52 : pps_mw.pges.pge_runner] Done pr_hl processing for """ + b"""/san1/polar_in/regional/aws/l1b/W_XX-FMI-Sodankyla,SAT,AWS1-MWR-1B-RAD_C_FMI_""" + b"""_20251021112652_R_D_20251021111330_20251021112402_C_N____.nc.""") + +EXPECTED_LIST2 = ["/san1/polar_out/direct_readout/lvl2/S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc", + "/san1/polar_out/direct_readout/lvl2/quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png", + "/san1/polar_out/direct_readout/lvl2/quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png" + ] + @pytest.fixture -def fake_config_yaml_file1(tmp_path): +def fake_config_yaml_one_output_file(tmp_path): """Write fake config yaml file.""" file_path = tmp_path / "some_config_file.yaml" with open(file_path, "w") as fpt: - fpt.write(TEST_YAML_CONFIG1) + fpt.write(TEST_YAML_CONFIG_ONE_OUTPUT_FILE) return file_path -def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml_file1): +@pytest.fixture +def fake_config_yaml_many_output_files(tmp_path): + """Write fake config yaml file.""" + file_path = tmp_path / "some_config_many_output_files.yaml" + with open(file_path, "w") as fpt: + fpt.write(TEST_YAML_CONFIG_MANY_OUTPUT_FILES) + + return file_path + + +def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml_one_output_file): """Test getting new files from regex pattern and log output.""" log_output = EXAMPLE_LOG_OUTPUT_BYTES - config = read_config(fake_config_yaml_file1) + config = read_config(fake_config_yaml_one_output_file) pattern = config[2]["output_files_log_regex"] result = get_newfiles_from_regex_and_logoutput(pattern, log_output) assert result == EXPECTED_LIST + + +def test_get_newfiles_from_regex_patterns_and_logoutput(fake_config_yaml_many_output_files): + """Test getting new files from regex patterns and log output.""" + log_output = LOG_OUTPUT_SEVERAL_FILES + config = read_config(fake_config_yaml_many_output_files) + + pattern = config[2]["output_files_log_regex"] + result = get_newfiles_from_regex_and_logoutput(pattern, log_output) + + assert sorted(result) == sorted(EXPECTED_LIST2) From 93c7ec2c42f86105af8566b80f969ee2cb772a6c Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 22 Oct 2025 11:27:42 +0200 Subject: [PATCH 2/5] Parametrize tests Signed-off-by: Adam.Dybbroe --- pytroll_runner/tests/test_prepare_messages.py | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/pytroll_runner/tests/test_prepare_messages.py b/pytroll_runner/tests/test_prepare_messages.py index c5ccc60..bc4d37b 100644 --- a/pytroll_runner/tests/test_prepare_messages.py +++ b/pytroll_runner/tests/test_prepare_messages.py @@ -53,8 +53,9 @@ b"""/san1/polar_in/direct_readout/aws/lvl1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_""" b"""C_SMHI_20250602201120_L_D_20250602033328_20250602033625_C_N____.nc'\n""") -EXPECTED_LIST = ["/san1/polar_in/direct_readout/aws/lvl1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_C_SMHI_20250602201" - "120_L_D_20250602033328_20250602033625_C_N____.nc"] +EXPECTED_LIST_ONE = ["/san1/polar_in/direct_readout/aws/lvl1/" + "W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_C_SMHI_20250602201" + "120_L_D_20250602033328_20250602033625_C_N____.nc"] LOG_OUTPUT_SEVERAL_FILES = (b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Start writing level2 dataset.""" b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Has written level2 file: """ @@ -70,47 +71,44 @@ b"""/san1/polar_in/regional/aws/l1b/W_XX-FMI-Sodankyla,SAT,AWS1-MWR-1B-RAD_C_FMI_""" b"""_20251021112652_R_D_20251021111330_20251021112402_C_N____.nc.""") -EXPECTED_LIST2 = ["/san1/polar_out/direct_readout/lvl2/S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc", - "/san1/polar_out/direct_readout/lvl2/quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png", - "/san1/polar_out/direct_readout/lvl2/quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png" - ] - -@pytest.fixture -def fake_config_yaml_one_output_file(tmp_path): - """Write fake config yaml file.""" - file_path = tmp_path / "some_config_file.yaml" - with open(file_path, "w") as fpt: - fpt.write(TEST_YAML_CONFIG_ONE_OUTPUT_FILE) - - return file_path +EXPECTED_LIST_MANY = ["/san1/polar_out/direct_readout/lvl2/" + "S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc", + "/san1/polar_out/direct_readout/lvl2/" + "quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png", + "/san1/polar_out/direct_readout/lvl2/" + "quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png" + ] @pytest.fixture -def fake_config_yaml_many_output_files(tmp_path): - """Write fake config yaml file.""" - file_path = tmp_path / "some_config_many_output_files.yaml" - with open(file_path, "w") as fpt: - fpt.write(TEST_YAML_CONFIG_MANY_OUTPUT_FILES) - +def fake_config_yaml(tmp_path, request): + """Write fake config yaml file based on parameter.""" + if request.param == "one": + content = TEST_YAML_CONFIG_ONE_OUTPUT_FILE + filename = "config_one.yaml" + elif request.param == "many": + content = TEST_YAML_CONFIG_MANY_OUTPUT_FILES + filename = "config_many.yaml" + else: + raise ValueError(f"Unknown param: {request.param}") + + file_path = tmp_path / filename + file_path.write_text(content) return file_path -def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml_one_output_file): - """Test getting new files from regex pattern and log output.""" - log_output = EXAMPLE_LOG_OUTPUT_BYTES - config = read_config(fake_config_yaml_one_output_file) - - pattern = config[2]["output_files_log_regex"] - result = get_newfiles_from_regex_and_logoutput(pattern, log_output) - assert result == EXPECTED_LIST - - -def test_get_newfiles_from_regex_patterns_and_logoutput(fake_config_yaml_many_output_files): - """Test getting new files from regex patterns and log output.""" - log_output = LOG_OUTPUT_SEVERAL_FILES - config = read_config(fake_config_yaml_many_output_files) - +@pytest.mark.parametrize( + ("fake_config_yaml", "log_output", "expected_list"), + [ + ("one", EXAMPLE_LOG_OUTPUT_BYTES, EXPECTED_LIST_ONE), + ("many", LOG_OUTPUT_SEVERAL_FILES, EXPECTED_LIST_MANY), + ], + indirect=["fake_config_yaml"] +) +def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml, log_output, expected_list): + """Test getting new files from regex pattern(s) and log output.""" + config = read_config(fake_config_yaml) pattern = config[2]["output_files_log_regex"] result = get_newfiles_from_regex_and_logoutput(pattern, log_output) - assert sorted(result) == sorted(EXPECTED_LIST2) + assert sorted(result) == sorted(expected_list) From d4894f71b7cb08741aa0e279c79afc9c10c5e7b1 Mon Sep 17 00:00:00 2001 From: Adam Dybbroe Date: Wed, 22 Oct 2025 12:21:02 +0200 Subject: [PATCH 3/5] Update pytroll_runner/__init__.py Co-authored-by: Martin Raspaud --- pytroll_runner/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytroll_runner/__init__.py b/pytroll_runner/__init__.py index 2df0bec..1202f7c 100644 --- a/pytroll_runner/__init__.py +++ b/pytroll_runner/__init__.py @@ -212,7 +212,7 @@ def generate_message_from_log_output(publisher_config, mda, log_output): return message -def _get_nefiles_from_regex(regex, log_output): +def _get_newfiles_from_regex(regex, log_output): """Get list of new output files from log messages.""" logger.debug(f"Matching regex-pattern: {regex} from log output") nfiles = re.findall(regex, str(log_output, "utf-8")) From 9d3a48b5a4aaf96067e965264045ac5d501c9da4 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 22 Oct 2025 12:22:26 +0200 Subject: [PATCH 4/5] Bugfix (typo) Signed-off-by: Adam.Dybbroe --- pytroll_runner/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytroll_runner/__init__.py b/pytroll_runner/__init__.py index 1202f7c..3c0a2f9 100644 --- a/pytroll_runner/__init__.py +++ b/pytroll_runner/__init__.py @@ -225,10 +225,10 @@ def get_newfiles_from_regex_and_logoutput(regex, log_output): if isinstance(regex, list): new_files = [] for rex in regex: - nfiles = _get_nefiles_from_regex(rex, log_output) + nfiles = _get_newfiles_from_regex(rex, log_output) new_files = new_files + nfiles else: - new_files = _get_nefiles_from_regex(regex, log_output) + new_files = _get_newfiles_from_regex(regex, log_output) return new_files From d2fcf19bbb3ebe9e3ebc9e93eac010839c3b047e Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 22 Oct 2025 14:11:04 +0200 Subject: [PATCH 5/5] Make it possible to controll having one message per output file or one message with all files as a dataset Signed-off-by: Adam.Dybbroe --- pytroll_runner/__init__.py | 24 +++++++++++++++++------- pytroll_runner/tests/test_runner.py | 22 ++++++++++++++++------ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/pytroll_runner/__init__.py b/pytroll_runner/__init__.py index 3c0a2f9..20b4fbb 100644 --- a/pytroll_runner/__init__.py +++ b/pytroll_runner/__init__.py @@ -88,9 +88,10 @@ def run_and_publish(config_file: Path, message_file: str | None = None): gen = run_from_message_file(command_to_call, message_file) for log_output, mda in gen: try: - message, preexisting_files = generate_message(publisher_config, mda, log_output, preexisting_files) - logger.debug(f"Sending message = {message}") - pub.send(str(message)) + messages, preexisting_files = generate_message(publisher_config, mda, log_output, preexisting_files) + for message in messages: + logger.debug(f"Sending message = {message}") + pub.send(str(message)) except FileNotFoundError: logger.debug("We could not find any new files, so no message will be sent.") @@ -98,12 +99,13 @@ def run_and_publish(config_file: Path, message_file: str | None = None): def generate_message(publisher_config, mda, log_output, preexisting_files): """Generate message from either the log output or existing files.""" try: - message = generate_message_from_log_output(publisher_config, mda, log_output) + messages = generate_message_from_log_output(publisher_config, mda, log_output) except KeyError: message = generate_message_from_expected_files(publisher_config, mda, preexisting_files) preexisting_files = check_existing_files(publisher_config) + messages = [message] - return message, preexisting_files + return messages, preexisting_files def run_from_message_file(command_to_call, message_file): @@ -208,8 +210,15 @@ def run_on_files(command: str, files: list[str]) -> bytes | None: def generate_message_from_log_output(publisher_config, mda, log_output): """Generate message for the filenames present in the log output.""" new_files = get_newfiles_from_regex_and_logoutput(publisher_config["output_files_log_regex"], log_output) - message = generate_message_from_new_files(publisher_config, new_files, mda) - return message + split_files = publisher_config.get("split_files") + messages = [] + if len(new_files) > 1 and split_files: + for newfile in new_files: + messages.append(generate_message_from_new_files(publisher_config, [newfile], mda)) + else: + messages.append(generate_message_from_new_files(publisher_config, new_files, mda)) + + return messages def _get_newfiles_from_regex(regex, log_output): @@ -245,6 +254,7 @@ def generate_message_from_new_files(pub_config, new_files, extra_metadata): raise FileNotFoundError("No new files were found.") metadata = populate_metadata(extra_metadata, pub_config.get("static_metadata", {})) + dataset = [] for filepath in sorted(new_files): filename = os.path.basename(filepath) diff --git a/pytroll_runner/tests/test_runner.py b/pytroll_runner/tests/test_runner.py index e5f78b1..44f9d61 100644 --- a/pytroll_runner/tests/test_runner.py +++ b/pytroll_runner/tests/test_runner.py @@ -64,6 +64,7 @@ def script_aws(redirection_specification): 2023-08-17T09:48:46.321622 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [P] STEP 6: Writing AWS L1 Output (elapsed 0.373 seconds) 2023-08-17T09:48:46.329884 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] lib4eo::NcMap: Loading map file '/local_disk/aws_test/conf/L1/AWS-L1B-RAD.xsd' 2023-08-17T09:48:46.578498 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Written output file : /local_disk/aws_test/test/RAD_AWS_1B/W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc +2023-08-17T09:48:46.578498 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Written output file : /local_disk/aws_test/test/RAD_AWS_1B/some_other_file.nc 2023-08-17T09:48:46.588984 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [P] STEP 7: Exiting (elapsed 0.640 seconds) 2023-08-17T09:48:46.589031 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] IPF-AWS-L1 v1.0.1 processor ending with success 2023-08-17T09:48:46.589041 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Exiting with EXIT CODE 0"{redirection_specification} @@ -174,7 +175,8 @@ def config_aws(command_aws): sub_config = dict(nameserver=False, addresses=["ipc://bla"]) pub_config = dict(publisher_settings=dict(nameservers=False, port=1979), topic="/hi/there", - output_files_log_regex="Written output file : (.*.nc)") + output_files_log_regex="Written output file : (.*.nc)", + split_files=True) command_path = os.fspath(command_aws) test_config = dict(subscriber_config=sub_config, script=command_path, @@ -473,14 +475,21 @@ def test_run_and_publish_with_files_from_log(tmp_path, config_file_aws): data = {"dataset": [{"uri": os.fspath(tmp_path / f), "uid": f} for f in some_files]} first_message = Message("some_topic", "dataset", data=data) - expected = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc" + expected1 = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc" + expected2 = "some_other_file.nc" + msg_id = 0 + message = {} with patched_subscriber_recv([first_message]): with patched_publisher() as published_messages: run_and_publish(config_file_aws) - assert len(published_messages) == 1 - message = Message(rawstr=published_messages[0]) - assert message.data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected + assert len(published_messages) == 2 + message[f"message{msg_id}"] = Message(rawstr=published_messages[msg_id]) + msg_id = msg_id + 1 + message[f"message{msg_id}"] = Message(rawstr=published_messages[msg_id]) + + assert message["message0"].data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected1 + assert message["message1"].data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected2 def test_run_and_no_publish_when_regex_unmatched(tmp_path, config_aws, caplog): @@ -602,11 +611,12 @@ def test_run_and_publish_from_message_file(tmp_path, config_file_aws): message_file = tmp_path / "first.msg" with open(message_file, mode="w") as fd: fd.write(str(first_message)) + expected = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc" with patched_publisher() as published_messages: main([str(config_file_aws), "-m", str(message_file)]) - assert len(published_messages) == 1 + assert len(published_messages) == 2 message = Message(rawstr=published_messages[0]) assert message.data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected