Skip to content
Closed
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8d08a68
Permit multiprocessing groups in YAML
richardpaulhudson May 9, 2022
12e8600
Basic multiprocessing functionality
richardpaulhudson May 9, 2022
e3b4ee7
Mypy corrections
richardpaulhudson May 9, 2022
a2bd489
Secondary functionality and documentation
richardpaulhudson May 10, 2022
8c8b81a
Fixed formatting issues
richardpaulhudson May 10, 2022
a481698
Corrections
richardpaulhudson May 10, 2022
ae82568
Changes after review
richardpaulhudson May 20, 2022
4daffdd
Changes based on review
richardpaulhudson May 23, 2022
2eb13f2
Readability improvement
richardpaulhudson May 23, 2022
9e665f9
Changes after internal discussions
richardpaulhudson Jun 20, 2022
5de1009
Add max_parallel_processes documentation
richardpaulhudson Jun 20, 2022
1cdb92d
Correction
richardpaulhudson Jun 20, 2022
e6a11b5
Extend scope of test
richardpaulhudson Jun 20, 2022
9d7a79e
First draft of new implementation (incomplete, doesn't run yet)
richardpaulhudson Jul 14, 2022
3c64e82
Correction
richardpaulhudson Jul 14, 2022
ca403f8
Seems to work, not yet tested in a structured way
richardpaulhudson Jul 18, 2022
e9ee680
Saved (intermediate version, doesn't compile yet)
richardpaulhudson Jul 18, 2022
4c2fc56
Refactoring into separate module
richardpaulhudson Jul 19, 2022
83d0738
Remove unnecessary changes
richardpaulhudson Jul 19, 2022
2c1f58e
Formal state machine
richardpaulhudson Jul 19, 2022
012578f
Improvements / corrections
richardpaulhudson Jul 19, 2022
44d51f4
Add failure test
richardpaulhudson Jul 19, 2022
d87fcab
Corrections / improvements
richardpaulhudson Jul 19, 2022
bc79e5a
Fix for Windows
richardpaulhudson Jul 19, 2022
9d005a7
Correct test
richardpaulhudson Jul 20, 2022
5d150f2
Add comment
richardpaulhudson Jul 20, 2022
f8301b4
Correction
richardpaulhudson Jul 20, 2022
d2bde9a
Reverse unnecessary change
richardpaulhudson Jul 20, 2022
91a173f
Add note to projects.md
richardpaulhudson Jul 20, 2022
4614a89
Improve error message
richardpaulhudson Jul 20, 2022
4902dd6
Final touches
richardpaulhudson Jul 20, 2022
e2c2ba4
Fix Mypy
richardpaulhudson Jul 20, 2022
48803e1
Improve document output
richardpaulhudson Jul 20, 2022
fcf7b6b
Use multiprocessing context
richardpaulhudson Jul 21, 2022
6b0ebcd
Improve error handling with hung processes
richardpaulhudson Jul 21, 2022
1650912
Add failure messages
richardpaulhudson Jul 21, 2022
fd8dbfd
Initial changes based on review comments
richardpaulhudson Jul 22, 2022
5fad119
Update spacy/tests/test_parallel.py
richardpaulhudson Jul 22, 2022
c7a8956
Update spacy/cli/_util.py
richardpaulhudson Jul 22, 2022
c6a4a7b
Merge 'origin/master' into feature/projects-multiprocessing
richardpaulhudson Jul 22, 2022
ac81dc9
Revert accidentally checked-in line
richardpaulhudson Jul 22, 2022
6ac15ad
Correct comment
richardpaulhudson Jul 23, 2022
4eb61a7
More updates based on review comments
richardpaulhudson Jul 25, 2022
10513a0
Format with black
richardpaulhudson Jul 25, 2022
567d006
Log to temporary directory
richardpaulhudson Jul 25, 2022
3d16625
Increase timeout to support GPU tests
richardpaulhudson Jul 25, 2022
5393df4
More changes based on review comments
richardpaulhudson Jul 27, 2022
8faf070
Specify new wasabi version
richardpaulhudson Jul 27, 2022
40416b1
Restore previous wasabi peg
richardpaulhudson Jul 27, 2022
1bf82db
Widened errors caught from os.kill()
richardpaulhudson Aug 24, 2022
78ee9c3
Revert to diagnose error
richardpaulhudson Aug 24, 2022
5aa95ce
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
70fa1ce
Copied changes from spaCy/tmp/project-multiprocess
richardpaulhudson Oct 4, 2022
afba051
Improve error logging
richardpaulhudson Oct 4, 2022
b48f2e1
Correction
richardpaulhudson Oct 4, 2022
522b0ed
Handle PermissionError in Windows CI
richardpaulhudson Oct 4, 2022
c8b7912
Correction
richardpaulhudson Oct 4, 2022
786473d
Switch to use TemporaryDirectory
richardpaulhudson Oct 4, 2022
b8a299f
Merge branch 'explosion:master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
2cc2cc1
Use mkdtemp()
richardpaulhudson Oct 4, 2022
cfaa902
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Jan 23, 2023
b3bcfe5
Empty commit to trigger CI
richardpaulhudson Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 175 additions & 15 deletions spacy/cli/_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict, Any, Union, List, Optional, Tuple, Iterable
from typing import TYPE_CHECKING, overload
from typing import TYPE_CHECKING, overload, cast
import sys
import shutil
from pathlib import Path
Expand All @@ -18,8 +18,12 @@

from ..compat import Literal
from ..schemas import ProjectConfigSchema, validate
from ..git_info import GIT_VERSION
from ..util import import_file, run_command, make_tempdir, registry, logger
from ..util import is_compatible_version, SimpleFrozenDict, ENV_VARS
from ..util import is_minor_version_match


from .. import about

if TYPE_CHECKING:
Expand Down Expand Up @@ -157,6 +161,7 @@ def load_project_config(
print("\n".join(errors))
sys.exit(1)
validate_project_version(config)
validate_max_parallel_processes(config)
validate_project_commands(config)
# Make sure directories defined in config exist
for subdir in config.get("directories", []):
Expand Down Expand Up @@ -199,7 +204,7 @@ def substitute_project_variables(


def validate_project_version(config: Dict[str, Any]) -> None:
"""If the project defines a compatible spaCy version range, chec that it's
"""If the project defines a compatible spaCy version range, check that it's
compatible with the current version of spaCy.

config (Dict[str, Any]): The loaded config.
Expand All @@ -215,30 +220,67 @@ def validate_project_version(config: Dict[str, Any]) -> None:
msg.fail(err, exits=1)


def validate_max_parallel_processes(config: Dict[str, Any]) -> None:
"""If the project defines a maximum number of parallel processes, check that the
value is within the permitted range.

config (Dict[str, Any]): The loaded config.
"""
max_parallel_processes = config.get("max_parallel_processes", None)
if max_parallel_processes is not None and max_parallel_processes < 2:
err = (
f"The {PROJECT_FILE} specifies a value for max_parallel_processes ({max_parallel_processes}) "
f"that is less than 2."
)
msg.fail(err, exits=1)


def verify_workflow_step(workflow_name: str, commands: List[str], step: str) -> None:
if step not in commands:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)


def validate_project_commands(config: Dict[str, Any]) -> None:
"""Check that project commands and workflows are valid, don't contain
duplicates, don't clash and only refer to commands that exist.
duplicates, don't clash and only refer to commands that exist.

config (Dict[str, Any]): The loaded config.
"""
command_names = [cmd["name"] for cmd in config.get("commands", [])]

commands = [cmd["name"] for cmd in config.get("commands", [])]
workflows = config.get("workflows", {})
duplicates = set([cmd for cmd in command_names if command_names.count(cmd) > 1])
duplicates = set([cmd for cmd in commands if commands.count(cmd) > 1])
if duplicates:
err = f"Duplicate commands defined in {PROJECT_FILE}: {', '.join(duplicates)}"
msg.fail(err, exits=1)
for workflow_name, workflow_steps in workflows.items():
if workflow_name in command_names:
for workflow_name, workflow_items in workflows.items():
if workflow_name in commands:
err = f"Can't use workflow name '{workflow_name}': name already exists as a command"
msg.fail(err, exits=1)
for step in workflow_steps:
if step not in command_names:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)
for workflow_item in workflow_items:
if isinstance(workflow_item, str):
verify_workflow_step(workflow_name, commands, workflow_item)
else:
steps = cast(List[str], workflow_item["parallel"])
if len(steps) < 2:
msg.fail(
f"Invalid parallel group within '{workflow_name}'.",
f"A parallel group must reference at least two commands.",
exits=1,
)
if len(steps) != len(set(steps)):
msg.fail(
f"Invalid parallel group within '{workflow_name}'.",
f"A parallel group may not contain a command more than once.",
exits=1,
)
for step in steps:
verify_workflow_step(workflow_name, commands, step)


def get_hash(data, exclude: Iterable[str] = tuple()) -> str:
Expand Down Expand Up @@ -573,3 +615,121 @@ def setup_gpu(use_gpu: int, silent=None) -> None:
local_msg.info("Using CPU")
if gpu_is_available():
local_msg.info("To switch to GPU 0, use the option: --gpu-id 0")
Comment thread
richardpaulhudson marked this conversation as resolved.


def check_rerun(
project_dir: Path,
command: Dict[str, Any],
*,
check_spacy_version: bool = True,
check_spacy_commit: bool = False,
) -> bool:
"""Check if a command should be rerun because its settings or inputs/outputs
changed.

project_dir (Path): The current project directory.
command (Dict[str, Any]): The command, as defined in the project.yml.
strict_version (bool):
RETURNS (bool): Whether to re-run the command.
"""
# Always rerun if no-skip is set
if command.get("no_skip", False):
return True
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists(): # We don't have a lockfile, run command
return True
data = srsly.read_yaml(lock_path)
if command["name"] not in data: # We don't have info about this command
return True
entry = data[command["name"]]
# Always run commands with no outputs (otherwise they'd always be skipped)
if not entry.get("outs", []):
return True
# Always rerun if spaCy version or commit hash changed
spacy_v = entry.get("spacy_version")
commit = entry.get("spacy_git_version")
if check_spacy_version and not is_minor_version_match(spacy_v, about.__version__):
info = f"({spacy_v} in {PROJECT_LOCK}, {about.__version__} current)"
msg.info(f"Re-running '{command['name']}': spaCy minor version changed {info}")
return True
if check_spacy_commit and commit != GIT_VERSION:
info = f"({commit} in {PROJECT_LOCK}, {GIT_VERSION} current)"
msg.info(f"Re-running '{command['name']}': spaCy commit changed {info}")
return True
# If the entry in the lockfile matches the lockfile entry that would be
# generated from the current command, we don't rerun because it means that
# all inputs/outputs, hashes and scripts are the same and nothing changed
lock_entry = _get_lock_entry(project_dir, command)
exclude = ["spacy_version", "spacy_git_version"]
return get_hash(lock_entry, exclude=exclude) != get_hash(entry, exclude=exclude)


def update_lockfile(
project_dir: Path,
command: Dict[str, Any],
) -> None:
"""Update the lockfile after running a command. Will create a lockfile if
it doesn't yet exist and will add an entry for the current command, its
script and dependencies/outputs.

project_dir (Path): The current project directory.
command (Dict[str, Any]): The command, as defined in the project.yml.
"""
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists():
srsly.write_yaml(lock_path, {})
data = {}
else:
data = srsly.read_yaml(lock_path)
data[command["name"]] = _get_lock_entry(project_dir, command)
srsly.write_yaml(lock_path, data)


def check_deps(cmd: Dict, cmd_name: str, project_dir: Path, dry: bool):
for dep in cmd.get("deps", []):
if not (project_dir / dep).exists():
err = f"Missing dependency specified by command '{cmd_name}': {dep}"
err_help = "Maybe you forgot to run the 'project assets' command or a previous step?"
err_kwargs = {"exits": 1} if not dry else {}
msg.fail(err, err_help, **err_kwargs)


def _get_lock_entry(project_dir: Path, command: Dict[str, Any]) -> Dict[str, Any]:
"""Get a lockfile entry for a given command. An entry includes the command,
the script (command steps) and a list of dependencies and outputs with
their paths and file hashes, if available. The format is based on the
dvc.lock files, to keep things consistent.

project_dir (Path): The current project directory.
command (Dict[str, Any]): The command, as defined in the project.yml.
RETURNS (Dict[str, Any]): The lockfile entry.
"""
deps = _get_fileinfo(project_dir, command.get("deps", []))
outs = _get_fileinfo(project_dir, command.get("outputs", []))
outs_nc = _get_fileinfo(project_dir, command.get("outputs_no_cache", []))
return {
"cmd": f"{COMMAND} run {command['name']}",
"script": command["script"],
"deps": deps,
"outs": [*outs, *outs_nc],
"spacy_version": about.__version__,
"spacy_git_version": GIT_VERSION,
}


def _get_fileinfo(
project_dir: Path, paths: List[str]
) -> List[Dict[str, Optional[str]]]:
"""Generate the file information for a list of paths (dependencies, outputs).
Includes the file path and the file's checksum.

project_dir (Path): The current project directory.
paths (List[str]): The file paths.
RETURNS (List[Dict[str, str]]): The lockfile entry for a file.
"""
data = []
for path in paths:
file_path = project_dir / path
md5 = get_checksum(file_path) if file_path.exists() else None
data.append({"path": path, "md5": md5})
return data
19 changes: 16 additions & 3 deletions spacy/cli/project/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
Commands are only re-run if their inputs have changed."""
INTRO_WORKFLOWS = f"""The following workflows are defined by the project. They
can be executed using [`spacy project run [name]`]({DOCS_URL}/api/cli#project-run)
and will run the specified commands in order. Commands are only re-run if their
inputs have changed."""
and will run the specified commands in order. Commands grouped within square brackets
are run in parallel. Commands are only re-run if their inputs
have changed."""
INTRO_ASSETS = f"""The following assets are defined by the project. They can
be fetched by running [`spacy project assets`]({DOCS_URL}/api/cli#project-assets)
in the project directory."""
Expand Down Expand Up @@ -69,7 +70,19 @@ def project_document(
md.add(md.table(data, ["Command", "Description"]))
# Workflows
wfs = config.get("workflows", {}).items()
data = [(md.code(n), " &rarr; ".join(md.code(w) for w in stp)) for n, stp in wfs]
data = []
for n, steps in wfs:
rendered_steps = []
for step in steps:
if isinstance(step, str):
rendered_steps.append(md.code(step))
else:
rendered_steps.append(
"["
+ ", ".join(md.code(p_step) for p_step in step["parallel"])
+ "]"
)
data.append([md.code(n), " &rarr; ".join(rendered_steps)])
if data:
md.add(md.title(3, "Workflows", "⏭"))
md.add(INTRO_WORKFLOWS)
Expand Down
12 changes: 11 additions & 1 deletion spacy/cli/project/dvc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""This module contains helpers and subcommands for integrating spaCy projects
with Data Version Controk (DVC). https://dvc.org"""
with Data Version Control (DVC). https://dvc.org"""
from typing import Dict, Any, List, Optional, Iterable
import subprocess
from pathlib import Path
Expand Down Expand Up @@ -106,6 +106,11 @@ def update_dvc_config(
dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
for name in workflows[workflow]:
if isinstance(name, dict) and "parallel" in name:
msg.fail(
f"A DVC workflow may not contain parallel groups",
exits=1,
)
command = config_commands[name]
deps = command.get("deps", [])
outputs = command.get("outputs", [])
Expand All @@ -123,6 +128,11 @@ def update_dvc_config(
dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd))
if len(dvc_commands) == 0:
msg.fail(
f"A DVC workflow must have at least one dependency or output",
exits=1,
)
with working_dir(path):
dvc_flags = {"--verbose": verbose, "--quiet": silent}
run_dvc_commands(dvc_commands, flags=dvc_flags)
Expand Down
Loading