diff --git a/src/toil_vg/vg_common.py b/src/toil_vg/vg_common.py index c2188cc4..0b1e809e 100644 --- a/src/toil_vg/vg_common.py +++ b/src/toil_vg/vg_common.py @@ -15,6 +15,7 @@ import logging from distutils.spawn import find_executable import collections +import inspect import socket import uuid import platform @@ -914,6 +915,60 @@ def title_to_filename(kind, i, title, extension): return ''.join(part_list) +def ensure_disk_bytes(job, job_function, required_disk_bytes): + """ + Make sure the give running job is running with at least the given number of + disk bytes. + + If so, returns None. + + If not, queues up the job_function again with the same arguments and the + given disk limit, and returns a promise for the value the caller should + return. + + Should be called directly by a Toil job function and not through an + intermediate. + + Will cause problems if anyone has already added child jobs to the job being + re-queued that expect to work with its return value. + """ + + if job.disk < required_disk_bytes: + # We need more space + + # Grab our caller's stack frame. + job_frame = inspect.stack()[1][0] + try: + # Grab the arg names and the dict to look them up in + (job_run_args, job_run_varargs, job_run_kwargs, job_locals) = inspect.getargvalues(job_frame) + # Drop the job argument itself and look them all up in job_locals. + # job_run_args[0]'s value should always be the job itself, and we don't need to pass that along. + job_rerun_args = [job_locals[name] for name in job_run_args[1:]] + if job_run_varargs is not None: + # Append all the varargs arguments to the call + job_run_args += job_locals[job_run_varargs] + + # Get the base kwargs that came into the job, or {} if there were none + job_rerun_kwargs = {} if job_run_kwargs is None else dict(job_locals[job_run_kwargs]) + # Add the cores/memory/disk that Toil will pull out + job_rerun_kwargs.update({"cores": job.cores, "memory": job.memory, "disk": required_disk_bytes}) + + RealtimeLogger.info("Re-queueing {} because we only have {}/{} estimated necessary disk space.".format( + job_function.__name__, job.disk, required_disk_bytes)) + # Queue the job again as a child with more disk. + promise = job.addChildJobFn(job_function, *job_rerun_args, **job_rerun_kwargs).rv() + + return promise + finally: + del job_frame + + else: + # The job has sufficient disk + RealtimeLogger.info("Job {} has {}/{} estimated necessary disk space.".format( + job_function.__name__, job.disk, required_disk_bytes)) + + return None + def ensure_disk(job, job_fn, job_fn_args, job_fn_kwargs, file_id_list, factor=8, padding=1024 ** 3): """ Ensure that the currently running job has enough disk to load all the given diff --git a/src/toil_vg/vg_map.py b/src/toil_vg/vg_map.py index 0f522edf..05425dff 100755 --- a/src/toil_vg/vg_map.py +++ b/src/toil_vg/vg_map.py @@ -22,7 +22,7 @@ from toil.realtimeLogger import RealtimeLogger from toil_vg.vg_common import * from toil_vg.context import Context, run_write_info_to_outstore -from toil_vg.vg_surject import * +from toil_vg.vg_surject import run_merge_bams, run_whole_surject logger = logging.getLogger(__name__) @@ -271,7 +271,13 @@ def run_split_reads(job, context, fastq, gam_input_reads, bam_input_reads, reads def run_split_fastq(job, context, fastq, fastq_i, sample_fastq_id): - + + disk_required = job.fileStore.getGlobalFileSize(sample_fastq_id) * 2 + (2 * 1024**3) + requeued = ensure_disk_bytes(job, run_split_fastq, disk_required) + if requeued is not None: + # If not, requeue the job with more disk. + return requeued + RealtimeLogger.info("Starting fastq split") start_time = timeit.default_timer() @@ -323,6 +329,13 @@ def run_split_fastq(job, context, fastq, fastq_i, sample_fastq_id): def run_split_gam_reads(job, context, gam_input_reads, gam_reads_file_id): """ split up an input reads file in GAM format """ + + disk_required = job.fileStore.getGlobalFileSize(gam_reads_file_id) * 2 + (2 * 1024**3) + requeued = ensure_disk_bytes(job, run_split_gam_reads, disk_required) + if requeued is not None: + # If not, requeue the job with more disk. + return requeued + RealtimeLogger.info("Starting gam split") start_time = timeit.default_timer() @@ -359,6 +372,13 @@ def run_split_gam_reads(job, context, gam_input_reads, gam_reads_file_id): def run_split_bam_reads(job, context, bam_input_reads, bam_reads_file_id): """ split up an input reads file in BAM format """ + + disk_required = job.fileStore.getGlobalFileSize(bam_reads_file_id) * 2 + (2 * 1024**3) + requeued = ensure_disk_bytes(job, run_split_bam_reads, disk_required) + if requeued is not None: + # If not, requeue the job with more disk. + return requeued + RealtimeLogger.info("Starting bam split") start_time = timeit.default_timer() @@ -484,7 +504,8 @@ def run_chunk_alignment(job, context, gam_input_reads, bam_input_reads, sample_n Takes a dict from index type to index file ID. Some indexes are extra and specifying them will change mapping behavior. """ - + + # TODO: Work out what indexes we will want and ensure_disk_bytes RealtimeLogger.info("Starting {} alignment on {} chunk {}".format(mapper, sample_name, chunk_id)) @@ -758,12 +779,24 @@ def run_merge_gams(job, context, sample_name, id_ranges_file_id, gam_chunk_file_ total_running_time += float(running_time) return chr_gam_ids, total_running_time - + def run_merge_chrom_gam(job, context, sample_name, chr_name, chunk_file_ids): """ - Make a chromosome gam by merging up a bunch of gam ids, one - for each shard. + Make a chromosome gam by merging up a list of gam ids, one + for each shard. """ + + RealtimeLogger.info('For chrom {}, merge files: {}'.format(chr_name, chunk_file_ids)) + RealtimeLogger.info('Args: {} {} {} {} {}'.format(job, context, sample_name, chr_name, chunk_file_ids)) + + # Check disk requirements to make sure we have enough room for 2 copies of everything, plus a bit. + # Get file size in a way that is robust to strs + disk_required = sum((job.fileStore.getGlobalFileSize(gam) for gam in chunk_file_ids)) * 2 + (2 * 1024**3) + requeued = ensure_disk_bytes(job, run_merge_chrom_gam, disk_required) + if requeued is not None: + # If not, requeue the job with more disk. + return requeued + # Define work directory for docker calls work_dir = job.fileStore.getLocalTempDir() @@ -776,6 +809,7 @@ def run_merge_chrom_gam(job, context, sample_name, chr_name, chunk_file_ids): with open(output_file, 'a') as merge_file: for chunk_gam_id in chunk_file_ids: tmp_gam_file = os.path.join(work_dir, 'tmp_{}.gam'.format(uuid4())) + RealtimeLogger.info('Download file ID {} to {}'.format(chunk_gam_id, tmp_gam_file)) job.fileStore.readGlobalFile(chunk_gam_id, tmp_gam_file) with open(tmp_gam_file) as tmp_f: shutil.copyfileobj(tmp_f, merge_file)