Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planemo/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ def _install_workflow(self, runnable):
# TODO: Allow serialization so this doesn't need to assume a
# shared filesystem with Galaxy server.
from_path = default_from_path or (runnable.type.name == "cwl_workflow")
workflow = import_workflow(runnable.path, admin_gi=self.gi, user_gi=self.user_gi, from_path=from_path)
workflow = import_workflow(runnable.path, user_gi=self.user_gi, from_path=from_path)
self._workflow_ids[runnable.path] = workflow["id"]

def workflow_id_for_runnable(self, runnable):
Expand Down
30 changes: 9 additions & 21 deletions planemo/galaxy/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
shed_tools,
)
from gxformat2.converter import python_to_workflow
from gxformat2.interface import (
BioBlendImporterGalaxyInterface,
ImporterGalaxyInterface,
)
from gxformat2.normalize import (
inputs_normalized,
outputs_normalized,
Expand Down Expand Up @@ -510,31 +506,26 @@ def install_shed_repos_for_workflow_id(
os.unlink(wf_path)


def import_workflow(path, admin_gi, user_gi, from_path=False):
def import_workflow(path, user_gi, from_path=False):
"""Import a workflow path to specified Galaxy instance."""
if not from_path:
importer = BioBlendImporterGalaxyInterface(admin_gi=admin_gi, user_gi=user_gi)
workflow = _raw_dict(path, importer)
workflow = _raw_dict(path)
return user_gi.workflows.import_workflow_dict(workflow)
else:
path = os.path.abspath(path)
workflow = user_gi.workflows.import_workflow_from_local_path(path)
return workflow


def _raw_dict(path, importer=None):
def _raw_dict(path):
if path.endswith(".ga"):
with open(path) as f:
workflow = json.load(f)
else:
if importer is None:
importer = DummyImporterGalaxyInterface()

workflow_directory = os.path.dirname(path)
workflow_directory = os.path.abspath(workflow_directory)
workflow_directory = os.path.abspath(os.path.dirname(path))
with open(path) as f:
workflow = yaml.safe_load(f)
workflow = python_to_workflow(workflow, importer, workflow_directory)
workflow = python_to_workflow(workflow, workflow_directory=workflow_directory)

return workflow

Expand Down Expand Up @@ -598,11 +589,6 @@ def describe_outputs(runnable, gi=None):
return outputs


class DummyImporterGalaxyInterface(ImporterGalaxyInterface):
def import_workflow(self, workflow, **kwds):
return None


def input_labels(workflow_path):
"""Get normalized labels for workflow artifact regardless of format."""
steps = inputs_normalized(workflow_path=workflow_path)
Expand Down Expand Up @@ -685,7 +671,7 @@ def job_template(workflow_path, **kwds):
}
],
}
elif input_type in ["string", "int", "float", "boolean", "color"]:
elif input_type in ["string", "text", "int", "integer", "long", "float", "double", "boolean", "color"]:
template[i_label] = "todo_param_value"
else:
template[i_label] = {
Expand Down Expand Up @@ -761,6 +747,8 @@ def _build_template_and_metadata_from_inputs(
default_value = input_step.get("default")
has_default = default_value is not None
input_format = input_step.get("format", "")
if isinstance(input_format, list):
input_format = ", ".join(input_format)
collection_type = input_step.get("collection_type", "")

# Store metadata for this input
Expand All @@ -785,7 +773,7 @@ def _build_template_and_metadata_from_inputs(
"collection_type": coll_type,
"elements": _collection_elements_for_type(coll_type),
}
elif input_type in ["string", "int", "float", "boolean", "color"]:
elif input_type in ["string", "text", "int", "integer", "long", "float", "double", "boolean", "color"]:
# Use default value if available, otherwise use placeholder or false for booleans
if has_default:
template[i_label] = default_value
Expand Down
128 changes: 36 additions & 92 deletions planemo/workflow_lint.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import inspect
import json
import os
import re
from collections import OrderedDict
from typing import (
Any,
Dict,
Expand All @@ -13,7 +11,6 @@
Tuple,
TYPE_CHECKING,
)
from urllib.parse import urlparse

import requests
import yaml
Expand All @@ -24,8 +21,11 @@
from galaxy.tool_util.parser.yaml import __to_test_assert_list
from galaxy.tool_util.verify import asserts
from gxformat2.lint import (
lint_format2,
lint_ga,
lint_best_practices_format2,
lint_best_practices_ga,
lint_format2_path,
lint_ga_path,
lint_pydantic_validation,
)
from gxformat2.yaml import ordered_load

Expand Down Expand Up @@ -59,6 +59,17 @@ class WorkflowLintContext(LintContext):
# from click arguments.
training_topic = None

def warn(self, message, linter=None, *args, **kwargs):
# gxformat2 lint rules pass Linter subclasses; galaxy LintMessage expects a name string.
if isinstance(linter, type):
linter = linter.__name__
super().warn(message, linter, *args, **kwargs)

def error(self, message, linter=None, *args, **kwargs):
if isinstance(linter, type):
linter = linter.__name__
super().error(message, linter, *args, **kwargs)


def build_wf_lint_args(ctx: "PlanemoCliContext", **kwds) -> Dict[str, Any]:
lint_args = build_lint_args(ctx, **kwds)
Expand Down Expand Up @@ -158,15 +169,8 @@ def _lint_workflow_artifacts_on_path(lint_context: WorkflowLintContext, path: st
)

elif looks_like_a_workflow(potential_workflow_artifact_path):

def structure(path, lint_context):
with open(path) as f:
workflow_dict = ordered_load(f)
workflow_class = workflow_dict.get("class")
lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga
lint_func(lint_context, workflow_dict, path=path)

lint_context.lint("lint_structure", structure, potential_workflow_artifact_path)
lint_context.lint("lint_structure", _lint_structure, potential_workflow_artifact_path)
lint_context.lint("lint_schema_validation", _lint_schema_validation, potential_workflow_artifact_path)
if lint_args["iwc_grade"]:
lint_context.lint("lint_release", _lint_release, potential_workflow_artifact_path)
lint_context.lint("lint_best_practices", _lint_best_practices, potential_workflow_artifact_path)
Expand Down Expand Up @@ -200,91 +204,31 @@ def _lint_tsts(path: str, lint_context: WorkflowLintContext) -> None:
lint_context.valid(f"Tests appear structurally correct for {runnable.path}")


def _lint_best_practices(path: str, lint_context: WorkflowLintContext) -> None: # noqa: C901
"""
This function duplicates the checks made by Galaxy's best practices panel:
https://github.com/galaxyproject/galaxy/blob/5396bb15fe8cfcf2e89d46c1d061c49b60e2f0b1/client/src/components/Workflow/Editor/Lint.vue
"""

def check_json_for_untyped_params(j):
values = j.values() if isinstance(j, dict) else j
for value in values:
if type(value) in [list, dict, OrderedDict]:
if check_json_for_untyped_params(value):
return True
elif isinstance(value, str):
if re.match(r"\$\{.+?\}", value):
return True
return False
def _lint_structure(path: str, lint_context: WorkflowLintContext) -> None:
workflow_dict = _load_workflow_dict(path)
if workflow_dict.get("class") == "GalaxyWorkflow":
lint_format2_path(lint_context, path)
else:
lint_ga_path(lint_context, path)

with open(path) as f:
workflow_dict = ordered_load(f)

steps = workflow_dict.get("steps", {})
def _lint_schema_validation(path: str, lint_context: WorkflowLintContext) -> None:
workflow_dict = _load_workflow_dict(path)
is_format2 = workflow_dict.get("class") == "GalaxyWorkflow"
lint_pydantic_validation(lint_context, workflow_dict, format2=is_format2)

# annotation
if not workflow_dict.get("annotation"):
lint_context.warn("Workflow is not annotated.")

# creator
creators = workflow_dict.get("creator", [])
if not len(creators) > 0:
lint_context.warn("Workflow does not specify a creator.")
def _lint_best_practices(path: str, lint_context: WorkflowLintContext) -> None:
workflow_dict = _load_workflow_dict(path)
if workflow_dict.get("class") == "GalaxyWorkflow":
lint_best_practices_format2(lint_context, workflow_dict)
else:
if not isinstance(creators, list):
# Don't know if this can happen, if we implement schema validation on the Galaxy side
# this won't be needed.
creators = [creators]
for creator in creators:
if creator.get("class", "").lower() == "person" and "identifier" in creator:
identifier = creator["identifier"]
parsed_url = urlparse(identifier)
if not parsed_url.scheme:
lint_context.warn(
f'Creator identifier "{identifier}" should be a fully qualified URI, for example "https://orcid.org/0000-0002-1825-0097".'
)

# license
if not workflow_dict.get("license"):
lint_context.warn("Workflow does not specify a license.")

# checks on individual steps
for step in steps.values():
# disconnected inputs
if step.get("type") not in ["data_collection_input", "parameter_input"]:
for input in step.get("inputs", []):
if input.get("name") not in step.get("input_connections"): # TODO: check optional
lint_context.warn(
f"Input {input.get('name')} of workflow step {step.get('annotation') or step.get('id')} is disconnected."
)

# missing metadata
if not step.get("annotation"):
lint_context.warn(f"Workflow step with ID {step.get('id')} has no annotation.")
if not step.get("label"):
lint_context.warn(f"Workflow step with ID {step.get('id')} has no label.")

# untyped parameters
if workflow_dict.get("class") == "GalaxyWorkflow":
tool_state = step.get("tool_state", {})
pjas = step.get("out", {})
else:
raw_tool_state = step.get("tool_state", {})
if isinstance(raw_tool_state, str):
tool_state = json.loads(raw_tool_state)
else:
tool_state = raw_tool_state
pjas = step.get("post_job_actions", {})
lint_best_practices_ga(lint_context, workflow_dict)

if check_json_for_untyped_params(tool_state):
lint_context.warn(f"Workflow step with ID {step.get('id')} specifies an untyped parameter as an input.")

if check_json_for_untyped_params(pjas):
lint_context.warn(
f"Workflow step with ID {step.get('id')} specifies an untyped parameter in the post-job actions."
)

# unlabeled outputs are checked by gxformat2, no need to check here
def _load_workflow_dict(path: str) -> Dict[str, Any]:
with open(path) as f:
return ordered_load(f)


def _lint_case(path: str, test_case: TestCase, lint_context: WorkflowLintContext) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ galaxy-job-config-init>=0.1.4
galaxy-tool-util[edam,extended-assertions]>=25.1,<26.0
galaxy-util[template]>=24.1,<26.0
glob2
gxformat2>=0.14.0
gxformat2>=0.25.0
h5py
jinja2
lxml
Expand Down
11 changes: 1 addition & 10 deletions tests/data/wf14-unlinted-best-practices.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,8 @@ class: GalaxyWorkflow
label: bp (imported from uploaded file)
uuid: 66708ffe-c08c-4647-a7e9-fc7f731206a1
inputs:
null:
input:
optional: false
position:
bottom: 730.3166656494141
height: 82.55000305175781
left: 1155
right: 1355
top: 647.7666625976562
width: 200
x: 1155
y: 647.7666625976562
type: data
outputs:
_anonymous_output_1:
Expand Down
8 changes: 0 additions & 8 deletions tests/data/wf_repos/basic_native_ok/basic_native.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
inputs:
the_input:
type: File
doc: input doc
id: the_input
outputs:
the_output:
outputSource: cat/out_file1
steps:
'0':
type: data_input
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
inputs:
the_input:
type: File
doc: input doc
id: the_input
outputs:
the_output:
outputSource: cat/out_file1
steps:
'0':
type: data_input
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cmd_training_generate_from_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .test_utils import (
CliTestCase,
mark,
skip_if_environ,
TEST_DATA_DIR,
)
Expand Down Expand Up @@ -41,6 +42,7 @@ def test_training_generate_from_wf_command_topic(self):
self._check_exit_code(training_fill_data_library_command, exit_code=2)

@skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS")
@mark.tests_galaxy_branch
def test_training_generate_from_wf_command_local_wf(self):
"""Test training_generate_from_wf command with local workflow."""
with self._isolate():
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cmd_training_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .test_utils import (
CliTestCase,
mark,
skip_if_environ,
TEST_DATA_DIR,
)
Expand Down Expand Up @@ -93,6 +94,7 @@ def test_training_init_command_tutorial_zenodo(self):
self._check_exit_code(training_init_command, exit_code=0)

@skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS")
@mark.tests_galaxy_branch
def test_training_init_command_tutorial_local_wf(self):
"""Test training_init command to create new tutorial with local workflow."""
with self._isolate():
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cmd_workflow_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

from .test_utils import (
CliTestCase,
mark,
skip_if_environ,
TEST_DATA_DIR,
)


class CmdWorkflowConvertTestCase(CliTestCase):
@skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS")
@mark.tests_galaxy_branch
def test_gxwf_to_ga(self):
with self._isolate() as f:
gx2_wf_path = os.path.join(TEST_DATA_DIR, "wf1.gxwf.yml")
Expand Down
Loading
Loading