Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions pytroll_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,24 @@ def run_and_publish(config_file: Path, message_file: str | None = None):
gen = run_from_message_file(command_to_call, message_file)
for log_output, mda in gen:
try:
message, preexisting_files = generate_message(publisher_config, mda, log_output, preexisting_files)
logger.debug(f"Sending message = {message}")
pub.send(str(message))
messages, preexisting_files = generate_message(publisher_config, mda, log_output, preexisting_files)
for message in messages:
logger.debug(f"Sending message = {message}")
pub.send(str(message))
except FileNotFoundError:
logger.debug("We could not find any new files, so no message will be sent.")


def generate_message(publisher_config, mda, log_output, preexisting_files):
"""Generate message from either the log output or existing files."""
try:
message = generate_message_from_log_output(publisher_config, mda, log_output)
messages = generate_message_from_log_output(publisher_config, mda, log_output)
except KeyError:
message = generate_message_from_expected_files(publisher_config, mda, preexisting_files)
preexisting_files = check_existing_files(publisher_config)
messages = [message]

return message, preexisting_files
return messages, preexisting_files


def run_from_message_file(command_to_call, message_file):
Expand Down Expand Up @@ -208,22 +210,41 @@ def run_on_files(command: str, files: list[str]) -> bytes | None:
def generate_message_from_log_output(publisher_config, mda, log_output):
"""Generate message for the filenames present in the log output."""
new_files = get_newfiles_from_regex_and_logoutput(publisher_config["output_files_log_regex"], log_output)
message = generate_message_from_new_files(publisher_config, new_files, mda)
return message
split_files = publisher_config.get("split_files")
messages = []
if len(new_files) > 1 and split_files:
for newfile in new_files:
messages.append(generate_message_from_new_files(publisher_config, [newfile], mda))
else:
messages.append(generate_message_from_new_files(publisher_config, new_files, mda))

return messages


def _get_newfiles_from_regex(regex, log_output):
"""Get list of new output files from log messages."""
logger.debug(f"Matching regex-pattern: {regex} from log output")
nfiles = re.findall(regex, str(log_output, "utf-8"))
logger.debug(f"Output files identified from log output: {nfiles}")
return nfiles


def get_newfiles_from_regex_and_logoutput(regex, log_output):
"""Get the filenames using a regex-pattern on the log_output."""
logger.debug(f"Matching regex-pattern: {regex} from log output")
new_files = re.findall(regex, str(log_output, "utf-8"))
logger.debug(f"Output files identified from log output: {new_files}")
if isinstance(regex, list):
new_files = []
for rex in regex:
nfiles = _get_newfiles_from_regex(rex, log_output)
new_files = new_files + nfiles
else:
new_files = _get_newfiles_from_regex(regex, log_output)

return new_files


def generate_message_from_expected_files(pub_config, extra_metadata=None, preexisting_files=None):
"""Generate a message containing the expected files."""
new_files = find_new_files(pub_config, preexisting_files or set())

return generate_message_from_new_files(pub_config, new_files, extra_metadata)


Expand All @@ -233,6 +254,7 @@ def generate_message_from_new_files(pub_config, new_files, extra_metadata):
raise FileNotFoundError("No new files were found.")

metadata = populate_metadata(extra_metadata, pub_config.get("static_metadata", {}))

dataset = []
for filepath in sorted(new_files):
filename = os.path.basename(filepath)
Expand Down
89 changes: 75 additions & 14 deletions pytroll_runner/tests/test_prepare_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pytroll_runner import get_newfiles_from_regex_and_logoutput, read_config

TEST_YAML_CONFIG1 = """script: /bin/awsat_l0l1b_run.sh
TEST_YAML_CONFIG_ONE_OUTPUT_FILE = """script: /bin/awsat_l0l1b_run.sh
publisher_config:
output_files_log_regex: "renamed .* -> '(.*.nc)"
publisher_settings:
Expand All @@ -24,30 +24,91 @@
addr_listener: true
"""

TEST_YAML_CONFIG_MANY_OUTPUT_FILES = """
script:
command: "/san1/opt/pps_mw_aws_runner/releases/pps_mw_aws_runner-0.0.10/bin/pps_mw.sh -p pr_hl -s aws -r W_.*.nc "
workers: 1
publisher_config:
output_files_log_regex:
- "Has written level2 file: (.*.nc)"
- "Has saved plot file: (.*euro4.png)"
- "Has saved plot file: (.*baltrad4.png)"
publisher_settings:
name: pps_mw_aws_runner
static_metadata:
data_processing_level: 2
type: NC
topic: /PPS-MW-NC/2/
subscriber_config:
topics:
- /awsat/l1b/metno/oslo
- /awsat/l1b/fmi/sodankyla
nameserver: localhost
addr_listener: true
"""

EXAMPLE_LOG_OUTPUT_BYTES = (b"""\nGenerating configuration..\n\nSetting SELinux context on...\n\nFound result file:"""
b"""\n\nrenamed '/san1/polar_in/direct_readout/aws/L1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-"""
b"""MWR-1B-RAD_C_SMHI_20250602201120_L_D_20250602033328_20250602033625_C_N____.nc' -> '"""
b"""/san1/polar_in/direct_readout/aws/lvl1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_"""
b"""C_SMHI_20250602201120_L_D_20250602033328_20250602033625_C_N____.nc'\n""")

EXPECTED_LIST = ["/san1/polar_in/direct_readout/aws/lvl1/W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_C_SMHI_20250602201"
"120_L_D_20250602033328_20250602033625_C_N____.nc"]
EXPECTED_LIST_ONE = ["/san1/polar_in/direct_readout/aws/lvl1/"
"W_XX-SMHI-Kangerlussuaq,SAT,AWS1-MWR-1B-RAD_C_SMHI_20250602201"
"120_L_D_20250602033328_20250602033625_C_N____.nc"]

LOG_OUTPUT_SEVERAL_FILES = (b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Start writing level2 dataset."""
b"""\n[INFO: 2025-10-21 11:27:47 : pps_mw.writers.level2] Has written level2 file: """
b"""/san1/polar_out/direct_readout/lvl2/"""
b"""S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc"""
b"""\n[INFO: 2025-10-21 11:27:51 : pps_mw.utils.plotting] Has saved plot file: """
b"""/san1/polar_out/direct_readout/lvl2/"""
b"""quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png"""
b"""\n[INFO: 2025-10-21 11:27:52 : pps_mw.utils.plotting] Has saved plot file: """
b"""/san1/polar_out/direct_readout/lvl2/"""
b"""quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png"""
b"""\n[INFO: 2025-10-21 11:27:52 : pps_mw.pges.pge_runner] Done pr_hl processing for """
b"""/san1/polar_in/regional/aws/l1b/W_XX-FMI-Sodankyla,SAT,AWS1-MWR-1B-RAD_C_FMI_"""
b"""_20251021112652_R_D_20251021111330_20251021112402_C_N____.nc.""")

EXPECTED_LIST_MANY = ["/san1/polar_out/direct_readout/lvl2/"
"S_NWC_PRHL_aws1_20251021T1116140Z_20251021T1123330Z.nc",
"/san1/polar_out/direct_readout/lvl2/"
"quicklook_PRHL_aws1_20251021111614_20251021112333_baltrad4.png",
"/san1/polar_out/direct_readout/lvl2/"
"quicklook_PRHL_aws1_20251021111614_20251021112333_euro4.png"
]


@pytest.fixture
def fake_config_yaml_file1(tmp_path):
"""Write fake config yaml file."""
file_path = tmp_path / "some_config_file.yaml"
with open(file_path, "w") as fpt:
fpt.write(TEST_YAML_CONFIG1)
def fake_config_yaml(tmp_path, request):
"""Write fake config yaml file based on parameter."""
if request.param == "one":
content = TEST_YAML_CONFIG_ONE_OUTPUT_FILE
filename = "config_one.yaml"
elif request.param == "many":
content = TEST_YAML_CONFIG_MANY_OUTPUT_FILES
filename = "config_many.yaml"
else:
raise ValueError(f"Unknown param: {request.param}")

file_path = tmp_path / filename
file_path.write_text(content)
return file_path


def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml_file1):
"""Test getting new files from regex pattern and log output."""
log_output = EXAMPLE_LOG_OUTPUT_BYTES
config = read_config(fake_config_yaml_file1)

@pytest.mark.parametrize(
("fake_config_yaml", "log_output", "expected_list"),
[
("one", EXAMPLE_LOG_OUTPUT_BYTES, EXPECTED_LIST_ONE),
("many", LOG_OUTPUT_SEVERAL_FILES, EXPECTED_LIST_MANY),
],
indirect=["fake_config_yaml"]
)
def test_get_newfiles_from_regex_and_logoutput(fake_config_yaml, log_output, expected_list):
"""Test getting new files from regex pattern(s) and log output."""
config = read_config(fake_config_yaml)
pattern = config[2]["output_files_log_regex"]
result = get_newfiles_from_regex_and_logoutput(pattern, log_output)
assert result == EXPECTED_LIST

assert sorted(result) == sorted(expected_list)
22 changes: 16 additions & 6 deletions pytroll_runner/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def script_aws(redirection_specification):
2023-08-17T09:48:46.321622 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [P] STEP 6: Writing AWS L1 Output (elapsed 0.373 seconds)
2023-08-17T09:48:46.329884 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] lib4eo::NcMap: Loading map file '/local_disk/aws_test/conf/L1/AWS-L1B-RAD.xsd'
2023-08-17T09:48:46.578498 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Written output file : /local_disk/aws_test/test/RAD_AWS_1B/W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc
2023-08-17T09:48:46.578498 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Written output file : /local_disk/aws_test/test/RAD_AWS_1B/some_other_file.nc
2023-08-17T09:48:46.588984 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [P] STEP 7: Exiting (elapsed 0.640 seconds)
2023-08-17T09:48:46.589031 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] IPF-AWS-L1 v1.0.1 processor ending with success
2023-08-17T09:48:46.589041 fe5e1feebbfb IPF-AWS-L1 01.00 [000000000045]: [I] Exiting with EXIT CODE 0"{redirection_specification}
Expand Down Expand Up @@ -174,7 +175,8 @@ def config_aws(command_aws):
sub_config = dict(nameserver=False, addresses=["ipc://bla"])
pub_config = dict(publisher_settings=dict(nameservers=False, port=1979),
topic="/hi/there",
output_files_log_regex="Written output file : (.*.nc)")
output_files_log_regex="Written output file : (.*.nc)",
split_files=True)
command_path = os.fspath(command_aws)
test_config = dict(subscriber_config=sub_config,
script=command_path,
Expand Down Expand Up @@ -473,14 +475,21 @@ def test_run_and_publish_with_files_from_log(tmp_path, config_file_aws):
data = {"dataset": [{"uri": os.fspath(tmp_path / f), "uid": f} for f in some_files]}
first_message = Message("some_topic", "dataset", data=data)

expected = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc"
expected1 = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc"
expected2 = "some_other_file.nc"

msg_id = 0
message = {}
with patched_subscriber_recv([first_message]):
with patched_publisher() as published_messages:
run_and_publish(config_file_aws)
assert len(published_messages) == 1
message = Message(rawstr=published_messages[0])
assert message.data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected
assert len(published_messages) == 2
message[f"message{msg_id}"] = Message(rawstr=published_messages[msg_id])
msg_id = msg_id + 1
message[f"message{msg_id}"] = Message(rawstr=published_messages[msg_id])

assert message["message0"].data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected1
assert message["message1"].data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected2


def test_run_and_no_publish_when_regex_unmatched(tmp_path, config_aws, caplog):
Expand Down Expand Up @@ -602,11 +611,12 @@ def test_run_and_publish_from_message_file(tmp_path, config_file_aws):
message_file = tmp_path / "first.msg"
with open(message_file, mode="w") as fd:
fd.write(str(first_message))

expected = "W_XX-OHB-Unknown,SAT,1-AWS-1B-RAD_C_OHB_20230817094846_G_D_20220621090100_20220621090618_T_B____.nc"

with patched_publisher() as published_messages:
main([str(config_file_aws), "-m", str(message_file)])
assert len(published_messages) == 1
assert len(published_messages) == 2
message = Message(rawstr=published_messages[0])

assert message.data["uri"] == "/local_disk/aws_test/test/RAD_AWS_1B/" + expected
Loading