Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions BOTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ Tests use pytest. Example commands:
./venv/bin/python -m pytest src/toil/test -k "safe" -v
```

## Running Make Targets (mypy, tests, etc.)

The `Makefile` targets require the virtualenv to be activated. Some targets (like `test_debug`) enforce this with a `check_venv` guard that checks for `VIRTUAL_ENV` in the environment. Set `PATH` and `VIRTUAL_ENV` to satisfy this without needing `source`:

```bash
PATH="./venv/bin:$PATH" VIRTUAL_ENV=./venv make mypy
PATH="./venv/bin:$PATH" VIRTUAL_ENV=./venv make test_debug tests='src/toil/test/path/to/test.py::TestClass::test_name'
```

## Running Individual WDL Spec Unit Tests

The WDL spec embeds example workflows as unit tests (under the `wdl-1.1` and `wdl-1.2` branches of `https://github.com/openwdl/wdl`). `TestWDLConformance.test_single_unit_test` in `src/toil/test/wdl/wdltoil_test.py` runs one such test at a time and is controlled by environment variables:
Expand All @@ -40,3 +49,7 @@ WDL_UNIT_TEST_ID=serde_pair ./venv/bin/python -m pytest \
```

This test clones remote git repos and may be slow. Many WDL spec tasks run inside containers, so **Docker must be running** — if the test fails with a Docker connection error, ask the user to start Docker before retrying.

## Code Style

Docstrings state the contract (what, not how). Implementation strategy goes in comments on the relevant code. Explain a concept once in a canonical docstring; reference it elsewhere. Names must be precise: no redundant qualifiers, invented terms, overclaiming, or test content as examples.
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ pytest-xdist
build
check-jsonschema
strip_ansi==0.1.1
edit_distance>=1.0.7,<2
14 changes: 11 additions & 3 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2167,10 +2167,14 @@ def ensure_file_imported(
logger.debug("Sending file at: %s", file_metadata["location"])


def writeGlobalFileWrapper(file_store: AbstractFileStore, fileuri: str) -> FileID:
def writeGlobalFileWrapper(
file_store: AbstractFileStore, fileuri: str, hints: list[str] | None = None
) -> FileID:
"""Wrap writeGlobalFile to accept file:// URIs."""
fileuri = fileuri if ":/" in fileuri else f"file://{fileuri}"
return file_store.writeGlobalFile(schema_salad.ref_resolver.uri_file_path(fileuri))
return file_store.writeGlobalFile(
schema_salad.ref_resolver.uri_file_path(fileuri), hints=hints
)


def remove_empty_listings(rec: CWLDirectoryType) -> None:
Expand Down Expand Up @@ -2229,6 +2233,9 @@ def __init__(
# We need something. Put the class.
name_parts.append(class_name)

# Keep the structured path for use as file hints.
self.task_path = name_parts

# String together the hierarchical name
unit_name = ".".join(name_parts)

Expand Down Expand Up @@ -2878,9 +2885,10 @@ def run(self, file_store: AbstractFileStore) -> Any:
fs_access = runtime_context.make_fs_access(runtime_context.basedir)

# And a file importer that can go from a file:// URI to a Toil FileID
hints = self.task_path or None
def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID:
logger.log(log_level, "Loading %s...", url)
return writeGlobalFileWrapper(file_store, url)
return writeGlobalFileWrapper(file_store, url, hints=hints)

file_visitor = functools.partial(
ensure_file_imported,
Expand Down
17 changes: 15 additions & 2 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def getLocalTempFileName(

# Functions related to reading, writing and removing files to/from the job store
@abstractmethod
def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
def writeGlobalFile(self, localFileName: str, cleanup: bool = False, hints: list[str] | None = None) -> FileID:
"""
Upload a file (as a path) to the job store.

Expand All @@ -319,6 +319,9 @@ def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
:param cleanup: if True then the copy of the global file will be deleted once
the job and all its successors have completed running. If not the global
file must be deleted manually.
:param hints: Optional human-readable path hints; see
:class:`~toil.jobStores.abstractJobStore.AbstractJobStore` for
details.

:return: an ID that can be used to retrieve the file.
"""
Expand All @@ -329,6 +332,7 @@ def writeGlobalFileStream(
self,
cleanup: bool = False,
basename: str | None = None,
hints: list[str] | None = None,
encoding: str | None = None,
errors: str | None = None,
) -> Iterator[tuple[WriteWatchingStream, FileID]]:
Expand All @@ -349,12 +353,21 @@ def writeGlobalFileStream(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: Optional human-readable path hints; see
:class:`~toil.jobStores.abstractJobStore.AbstractJobStore` for
details.

:return: A context manager yielding a tuple of
1) a file handle which can be written to and
2) the toil.fileStores.FileID of the resulting file in the job store.
"""
with self.jobStore.write_file_stream(
str(self.jobDesc.jobStoreID), cleanup, basename, encoding, errors
str(self.jobDesc.jobStoreID),
cleanup=cleanup,
basename=basename,
hints=hints,
encoding=encoding,
errors=errors,
) as (backingStream, fileStoreID):
# We have a string version of the file ID, and the backing stream.
# We need to yield a stream the caller can write to, and a FileID
Expand Down
4 changes: 2 additions & 2 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ def open(self, job: Job) -> Generator[None, None, None]:
# its temp dir and database entry.
self._deallocateSpaceForJob()

def writeGlobalFile(self, localFileName, cleanup=False):
def writeGlobalFile(self, localFileName, cleanup=False, hints=None):
"""
Creates a file in the jobstore and returns a FileID reference.
"""
Expand All @@ -1224,7 +1224,7 @@ def writeGlobalFile(self, localFileName, cleanup=False):
# Make sure to pass along the file basename.
# TODO: this empty file could leak if we die now...
fileID = self.jobStore.get_empty_file_store_id(
creatorID, cleanup, os.path.basename(localFileName)
creatorID, cleanup, os.path.basename(localFileName), hints=hints
)
# Work out who we are
with self.as_process() as me:
Expand Down
4 changes: 2 additions & 2 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def open(self, job: Job) -> Generator[None, None, None]:
self.jobStateFile,
)

def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
def writeGlobalFile(self, localFileName: str, cleanup: bool = False, hints: list[str] | None = None) -> FileID:
absLocalFileName = self._resolveAbsoluteLocalPath(localFileName)
creatorID = str(self.jobDesc.jobStoreID)
fileStoreID = self.jobStore.write_file(absLocalFileName, creatorID, cleanup)
fileStoreID = self.jobStore.write_file(absLocalFileName, creatorID, cleanup, hints=hints)
if absLocalFileName.startswith(self.localTempDir):
# Only files in the appropriate directory should become local files
# we can delete with deleteLocalFile
Expand Down
91 changes: 84 additions & 7 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ class AbstractJobStore(ABC):

To actually get ahold of a :class:`toil.job.Job`, use
:meth:`toil.job.Job.loadJob` with a JobStore and the relevant JobDescription.

.. rubric:: File Hints

File-writing methods accept an optional ``hints`` parameter: a list of
strings (e.g. workflow name, task name) that are incorporated into the
stored file's path or key so a human browsing the job store can locate
it. Each hint becomes a path component after sanitization. Two files
written with the same hints and basename are guaranteed distinct IDs;
disambiguation is handled by appending a numbered directory (``0/``,
``1/``, ...). Deleted slots are never reused (the empty directory or a
tombstone object prevents reallocation), which is required because Toil
may journal and replay file deletions.
"""

def __init__(self, locator: str) -> None:
Expand Down Expand Up @@ -1084,22 +1096,64 @@ def jobs(self) -> Iterator[JobDescription]:
raise NotImplementedError()

##########################################
# The following provide an way of creating/reading/writing/updating files
# associated with a given job.
# The following provide a way of creating/reading/writing/updating files
# associated with a given job. See the "File Hints" section in the
# class docstring for the hints system.
##########################################

# Strips everything except characters safe in filesystem paths, S3 keys,
# and urllib quote() output: ASCII letters, digits, underscore, dot, hyphen.
HINT_SAFE_RE = re.compile(r"[^a-zA-Z0-9_.-]")
# Maximum length of a single sanitized hint path component.
MAX_HINT_LENGTH = 40
# Maximum total length of the hints portion of the path (joined with /).
MAX_HINTS_PATH_LENGTH = 120

def _sanitize_hints(self, hints: list[str] | None) -> list[str]:
"""
Produce path-safe components from user-supplied hint strings.

Each hint is stripped of unsafe characters and truncated to
:attr:`MAX_HINT_LENGTH`. Hints are collected in order until
adding the next one (plus a ``/`` separator) would exceed
:attr:`MAX_HINTS_PATH_LENGTH`, then the rest are dropped.
"""
if not hints:
return []
result: list[str] = []
total = 0
for hint in hints:
cleaned = self.HINT_SAFE_RE.sub("", hint)[: self.MAX_HINT_LENGTH]
if not cleaned:
continue
needed = len(cleaned) + (1 if result else 0)
if total + needed > self.MAX_HINTS_PATH_LENGTH:
break
total += needed
result.append(cleaned)
return result

# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="write_file")
def writeFile(
self,
localFilePath: str,
jobStoreID: str | None = None,
cleanup: bool = False,
) -> str:
return self.write_file(localFilePath, jobStoreID, cleanup)
return self.write_file(
localFilePath,
job_id=jobStoreID,
cleanup=cleanup
)

@abstractmethod
def write_file(
self, local_path: str, job_id: str | None = None, cleanup: bool = False
self,
local_path: str,
job_id: str | None = None,
cleanup: bool = False,
hints: list[str] | None = None,
) -> str:
"""
Takes a file (as a path) and places it in this job store. Returns an ID that can be used
Expand All @@ -1118,6 +1172,9 @@ def write_file(
whose jobStoreID was given as jobStoreID is deleted with
jobStore.delete(job). If jobStoreID was not given, does nothing.

:param hints: Optional human-readable path hints; see the file
operations section comment on this class for details.

:raise ConcurrentFileModificationException: if the file was modified concurrently during
an invocation of this method

Expand All @@ -1131,6 +1188,7 @@ def write_file(
"""
raise NotImplementedError()

# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="write_file_stream")
def writeFileStream(
self,
Expand All @@ -1140,14 +1198,21 @@ def writeFileStream(
encoding: str | None = None,
errors: str | None = None,
) -> ContextManager[tuple[IO[bytes], str]]:
return self.write_file_stream(jobStoreID, cleanup, basename, encoding, errors)
return self.write_file_stream(
jobStoreID,
cleanup=cleanup,
basename=basename,
encoding=encoding,
errors=errors
)

@abstractmethod
@contextmanager
def write_file_stream(
self,
job_id: str | None = None,
cleanup: bool = False,
hints: list[str] | None = None,
basename: str | None = None,
encoding: str | None = None,
errors: str | None = None,
Expand All @@ -1171,6 +1236,9 @@ def write_file_stream(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: Optional human-readable path hints; see the file
operations section comment on this class for details.

:param str encoding: the name of the encoding used to encode the file. Encodings are the same
as for encode(). Defaults to None which represents binary mode.

Expand All @@ -1189,22 +1257,28 @@ def write_file_stream(
:rtype: Iterator[Tuple[IO[bytes], str]]
"""
raise NotImplementedError()


# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="get_empty_file_store_id")
def getEmptyFileStoreID(
self,
jobStoreID: str | None = None,
cleanup: bool = False,
basename: str | None = None,
) -> str:
return self.get_empty_file_store_id(jobStoreID, cleanup, basename)
return self.get_empty_file_store_id(
job_id=jobStoreID,
cleanup=cleanup,
basename=basename
)

@abstractmethod
def get_empty_file_store_id(
self,
job_id: str | None = None,
cleanup: bool = False,
basename: str | None = None,
hints: list[str] | None = None,
) -> str:
"""
Creates an empty file in the job store and returns its ID.
Expand All @@ -1221,6 +1295,9 @@ def get_empty_file_store_id(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: Optional human-readable path hints; see the file
operations section comment on this class for details.

:return: a jobStoreFileID that references the newly created file and can be used to reference the
file in the future.
:rtype: str
Expand Down
Loading
Loading