Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ cwl_v1.2:
- virtualenv -p ${MAIN_PYTHON_PKG} venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws]
- python setup_gitlab_docker.py # login to increase the docker.io rate limit
- make test tests=src/toil/test/cwl/cwlTest.py::CWLv12Test
- make test tests=src/toil/test/cwl/cwlTest.py::CWLToilOptimizeTests

cwl_v1.0_kubernetes:
stage: main_tests
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def _runDebugJob(self, jobCommand, jobID, environment):
# We can actually run in this thread
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
jobStore = Toil.resumeJobStore(jobStoreLocator)
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID,
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID, None,
redirectOutputToLogFile=not self.debugWorker) # Call the worker
else:
# Run synchronously. If starting or running the command fails, let the exception stop us.
Expand Down
8 changes: 6 additions & 2 deletions src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ def innerLoop(self):

# Consistency check the toil state
assert self.toilState.updatedJobs == {}
assert self.toilState.successorCounts == {}
assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
#assert self.toilState.successorCounts == {}
#assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
assert self.toilState.serviceJobStoreIDToPredecessorJob == {}
assert self.toilState.servicesIssued == {}
# assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet
Expand Down Expand Up @@ -752,6 +752,10 @@ def issueJob(self, jobNode):
workerCommand.append('--context')
workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8'))

# add the toilState as a pickle
workerCommand.append('--toilState')
workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8'))

jobNode.command = ' '.join(workerCommand)
# jobBatchSystemID is an int that is an incremented counter for each job
jobBatchSystemID = self.batchSystem.issueBatchJob(jobNode)
Expand Down
48 changes: 48 additions & 0 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,54 @@ def test_download_structure(self) -> None:
call(fid1, os.path.join(to_dir, 'dir1/dir2/f1again'), symlink=True),
call(fid2, os.path.join(to_dir, 'anotherfile'), symlink=True)], any_order=True)

@needs_cwl
class CWLToilOptimizeTests(ToilTest):
def setUp(self):
"""Runs anew before each test to create farm fresh temp dirs."""
self.outDir = f'/tmp/toil-cwl-test-{str(uuid.uuid4())}'
os.makedirs(self.outDir)
self.rootDir = self._projectRootPath()
self.jobDir = os.path.join(self.outDir, 'jobStore')
self.statDir = os.path.join(self.jobDir, 'stats')

def tearDown(self):
"""Clean up outputs."""
if os.path.exists(self.outDir):
shutil.rmtree(self.outDir)
unittest.TestCase.tearDown(self)

def _tester(self, cwlfile, jobfile, expect, main_args=[]):
from toil.cwl import cwltoil
st = StringIO()
main_args = main_args[:]
main_args.extend(['--logDebug','--stats','--outdir', self.outDir, '--jobStore', self.jobDir,
os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile)])
cwltoil.main(main_args, stdout=st)
out = self._extract_job_lists()
self.assertEqual(out, expect)

def _match_extract_string(self, stringin):
import re
search_pattern = re.compile('^.* (\w*) kind-CWLJob/instance-.*$')
if search_pattern.match(stringin):
return(search_pattern.sub(r'\1',stringin))
else:
return(None)

def _extract_job_lists(self):
worker_list = []
for filename in os.listdir(self.statDir):
with open(os.path.join(self.statDir,filename)) as f:
test_json = json.load(f)
if 'workers' in test_json.keys() and len(test_json['jobs']) > 0:
job_list = [self._match_extract_string(x) for x in test_json['logs']['names']]
if not all(x == None for x in job_list):
worker_list.append(job_list)
worker_list.sort()
return(worker_list)

def test_biobb_fail(self):
self._tester('src/toil/test/cwl/md_list_reduced.cwl',
'src/toil/test/cwl/md_list_reduced.json',
[['genion', 'grompp', 'pdb2gmx', 'editconf', 'solvate']],
main_args=[])
Loading