diff --git a/src/toil_vg/context.py b/src/toil_vg/context.py index fc6fcd8c..b2a629b2 100644 --- a/src/toil_vg/context.py +++ b/src/toil_vg/context.py @@ -114,22 +114,23 @@ def get_out_store(self): else: return None - def write_intermediate_file(self, job, path): + def write_intermediate_file(self, job, path, out_store_path = None): """ Write the file at the given path to the given job's Toil FileStore, and to the out_store if one is in use and we are trying to dump intermediate files. In the out_store, the file is saved under its basename, in the root - directory. + directory. If out_store_path is set, the file is saved there instead. Returns the Toil file ID for the written file. """ out_store = self.get_out_store() if out_store is not None and self.config.force_outstore: + name = out_store_path if out_store_path else os.path.basename(path) # Save to the out_store if it exists - out_store.write_output_file(path, os.path.basename(path)) + out_store.write_output_file(path, name) # Save to Toil return job.fileStore.writeGlobalFile(path) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 9b8dbf5a..9d644c3e 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -404,7 +404,7 @@ def test_06_BRCA1_NA12877(self): self._assertOutput('NA12877', self.local_outstore, f1_threshold=0.45) def test_07_BRCA1_BRCA2_NA12877(self): - ''' Test pipeline on chase with two chromosomes, in this case both BRCA regions + ''' Test pipeline on case with two chromosomes, in this case both BRCA regions ''' self._download_input('NA12877.brca1.brca2.bam.fq.gz') self._download_input('snp1kg-brca1.vg') @@ -595,7 +595,39 @@ def _test_09_construct(self, container_override): if prev_vg_size is not None: assert vg_size <= prev_vg_size prev_vg_size = vg_size - + + def test_10_construct_multiple_contigs(self): + ''' Test ability to group construction jobs, in the chape of GRCh38 + ''' + + chrom_names = [f'{x}' for x in range(1,23)] + ['X', 'Y'] + vcf_bases = [f'chr{x}' for x in chrom_names] + ['others'] + region_names = chrom_names + ['chrM'] + + in_vcfs = [self._ci_input_path(f'GRCh38.1000gp.fake.{vcf_base}.vcf.gz') for vcf_base in vcf_bases] + in_tbis = [in_vcf + '.tbi' for in_vcf in in_vcfs] + in_fa = self._ci_input_path('GRCh38.fake.fa') + in_coalesce_regions = self._ci_input_path('GRCh38.1000gp.fake.minor_contigs.tsv') + out_name = 'Fake1000GP' + + print("Construct to " + self.local_outstore) + + command = ['toil-vg', 'construct', self.jobStoreLocal, self.local_outstore, + '--container', self.containerType, + '--clean', 'never', + '--fasta', in_fa, '--vcf'] + in_vcfs + ['--vcf_phasing'] + in_vcfs + [ + '--regions'] + region_names + ['--fasta_regions', '--remove_chr_prefix', + '--out_name', out_name, '--pangenome', '--filter_ceph', '--min_af', '0.01', + '--all_index', + '--realTimeLogging', '--logInfo', '--coalesce_regions', in_coalesce_regions] + + self._run(command) + self._run(['toil', 'clean', self.jobStoreLocal]) + + for middle in ['_', '_filter_', '_minaf_0.01_']: + # Should now leave a coalesced region + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesced0.vg'.format(out_name, middle)))) + def test_11_gbwt(self): ''' Test that the gbwt gets constructed without crashing (but not much beyond that) diff --git a/src/toil_vg/vg_common.py b/src/toil_vg/vg_common.py index 32d214e8..38bbc810 100644 --- a/src/toil_vg/vg_common.py +++ b/src/toil_vg/vg_common.py @@ -524,6 +524,10 @@ def call_with_docker(self, job, args, work_dir, outfile, errfile, check_output, # When we get here, the container has been run, and stdout is either in the file object we sent it to or in the Docker logs. # stderr is always in the Docker logs. + + if isinstance(return_code, dict) and 'StatusCode' in return_code: + # New? Docker gives us a dict like this + return_code = return_code['StatusCode'] if return_code != 0: # What were we doing? @@ -1087,3 +1091,66 @@ def resolve(self, result): return result.result() else: return result + + +def apply_coalesce(regions, region_names=None, coalesce_regions=[]): + """ + Given a list of regions, and a list of sets of regions to coalesce if all + are present, produce a list of regions or sets of regions, preserving the + original region order among the non-coalesced regions, with the sets at the + end. + + Also takes a list of region names. If not set, the regions themselves are + used. + + If all the regions in a set are present, they will all be pulled out of the + normal ordering and put in that set at the end. + + Returns (coalesced regions, coalesced region names) + """ + + if not region_names: + region_names = regions + + wanted_regions = set(regions) + # We need to fake the output names for the coalesced regions based on + # the original ones. So map from region to region name. + region_to_name = dict(zip(regions, region_names)) + # These will replace regions and region_names if we coalesce away regions + coalesced_regions = [] + coalesced_names = [] + coalesced_away = set() + + for to_coalesce in coalesce_regions: + # Find out if we have all the regions that need to be coalesced here. + have_all = True + for region in to_coalesce: + if region not in wanted_regions: + have_all = False + break + if have_all: + # Use this coalescing + coalesced_regions.append(to_coalesce) + + # Try and replace the region in its name, if possible, when naming the coalesced region. + region_in_name = region_to_name[region].rfind(region) + base_name = region_to_name[region][:region_in_name] if region_in_name != -1 else region_to_name[region] + coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions) - 1)) + # And skip these regions + coalesced_away.update(to_coalesce) + + if len(coalesced_away) > 0: + # Drop the coalesced regions from regions + remaining_regions = [] + remaining_names = [] + for i in range(len(regions)): + if regions[i] not in coalesced_away: + remaining_regions.append(regions[i]) + remaining_names.append(region_names[i]) + # And replace the original regions with the coalesced ones, and the + # remaining uncoalesced regions. Put the remaining ones first because + # they are probably big chromosomes and may have associated VCFs. + regions = remaining_regions + coalesced_regions + region_names = remaining_names + coalesced_names + + return (regions, region_names) diff --git a/src/toil_vg/vg_config.py b/src/toil_vg/vg_config.py index 89a2ea88..fa0c9a09 100644 --- a/src/toil_vg/vg_config.py +++ b/src/toil_vg/vg_config.py @@ -156,7 +156,7 @@ ## of through docker. # Docker image to use for vg -vg-docker: 'quay.io/vgteam/vg:v1.28.0' +vg-docker: 'quay.io/vgteam/vg:ci-2850-58eed1cb13f9444849933685a7f51a8dc9273ca6' # Docker image to use for bcftools bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0' @@ -463,7 +463,7 @@ ## of through docker. # Docker image to use for vg -vg-docker: 'quay.io/vgteam/vg:v1.24.0' +vg-docker: 'quay.io/vgteam/vg:ci-2850-58eed1cb13f9444849933685a7f51a8dc9273ca6' # Docker image to use for bcftools bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0' diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index d693f799..62f042ad 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -59,6 +59,8 @@ def construct_subparser(parser): parser.add_argument("--alt_regions_bed", type=make_url, help="BED file mapping alt regions (cols 1-3) to sequence names (col 4) from the FASTA. " "Alt regions will be aligned to the graph using vg msga") + parser.add_argument("--coalesce_regions", type=make_url, + help="File of tab-separated sets of sequence names for batching construction jobs, one per line.") parser.add_argument("--max_node_size", type=int, default=32, help="Maximum node length") parser.add_argument("--alt_paths", action="store_true", @@ -358,15 +360,23 @@ def run_fix_chrom_names(job, context, to_ucsc, regions, fasta_ids, fasta_names, out_regions = [] something_to_rename = False - # map the regions + # Map the regions. + # Some regions may map to the same region as a previous region if e.g. we + # fetched region names out of the FASTA. So deduplicate here. + done_regions = set() for region in regions: region_name = region.split(':')[0] if region_name in name_map: something_to_rename = True - out_regions.append(name_map[region_name] + region[len(region_name):]) + renamed_region = name_map[region_name] + region[len(region_name):] + if renamed_region not in done_regions: + out_regions.append(renamed_region) + done_regions.add(renamed_region) else: something_to_rename = something_to_rename or region_name in list(name_map.values()) - out_regions.append(region) + if region not in done_regions: + out_regions.append(region) + done_regions.add(region) # map the vcf out_vcf_ids = [] @@ -479,7 +489,26 @@ def run_subtract_alt_regions(job, context, alt_regions_id, regions): alt_regions.add(toks[3]) return [region for region in regions if region not in alt_regions], list(alt_regions) - + +def run_read_coalesce_list(job, context, coalesce_regions_id): + """ + Read the given input file. + Produce a list of sets of region/contig names to coalesce into single jobs. + Treats the input file as tab-separated, one set per line. + """ + + work_dir = job.fileStore.getLocalTempDir() + coalesce_regions_path = os.path.join(work_dir, 'coalesce-regions.tsv') + job.fileStore.readGlobalFile(coalesce_regions_id, coalesce_regions_path) + coalesce_regions = [] + with open(coalesce_regions_path) as in_regions: + for line in in_regions: + toks = line.strip().split('\t') + if len(toks) > 0 and toks[0] != '' and toks[0][0] != '#': + coalesce_regions.append(set(toks)) + + return coalesce_regions + def run_generate_input_vcfs(job, context, vcf_ids, vcf_names, tbi_ids, regions, output_name, do_primary = False, @@ -692,7 +721,7 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, wanted_indexes = set(), haplo_extraction_sample = None, haplotypes = [0,1], gbwt_prune = False, normalize = False, validate = False, alt_regions_id = None, - alt_regions = []): + alt_regions = [], coalesce_regions = []): """ construct many graphs in parallel, optionally doing indexing too. vcf_inputs is a list of tuples as created by run_generate_input_vcfs @@ -700,6 +729,10 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, Returns a list of tuples of the form (vg_ids, vg_names, indexes), where indexes is the index dict from index type to file ID. + If coalesce_regions is set (and alt_regions is not), it must be a list of + sets of region names. Each set of region names will be run together as a + single construction job, replacing the individual jobs for those regions. + """ output = [] @@ -714,14 +747,15 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, max_node_size, ('gbwt' in wanted_indexes) or haplo_extraction or alt_paths, flat_alts, handle_svs, regions, region_names, sort_ids, join_ids, name, merge_output_name, - normalize and name != 'haplo', validate, alt_regions_id) - + normalize and name != 'haplo', validate, alt_regions_id, + coalesce_regions=coalesce_regions) + mapping_id = construct_job.rv('mapping') # Find the joined VG files, which always exist joined_vg_ids = construct_job.rv('joined') # And give them names - joined_vg_names = [remove_ext(i, '.vg') + '.vg' for i in region_names] + joined_vg_names = construct_job.rv('joined_names') if merge_graphs or not regions or len(regions) < 2: # Sometimes we will have a single VG file also @@ -743,6 +777,8 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, # Get the chromosome names chroms = [p.split(':')[0] for p in regions] + RealtimeLogger.info('Operating on chromosomes: %s', chroms) + # Make sure we have no more than 1 region per chromosome. # Otherwise GBWT region restriction will mess things up. assert(len(chroms) == len(set(chroms))) @@ -788,24 +824,28 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, join_job = sample_job.addFollowOnJobFn(run_join_graphs, context, sample_job.rv(), False, region_names, name, sample_merge_output_name, cores=context.config.construct_cores, - memory=context.config.construct_mem, - disk=context.config.construct_disk) + memory=context.config.xg_index_mem, + disk=context.config.xg_index_disk) # Want to keep a whole-genome withref xg index around for mapeval purposes if len(regions) > 1 and ('xg' in wanted_indexes): wanted = set('xg') construct_job.addFollowOnJobFn(run_indexing, context, joined_vg_ids, joined_vg_names, output_name_base, chroms, [], [], - wanted=wanted) + wanted=wanted, coalesce_regions=coalesce_regions) - index_prev_job = join_job # In the indexing step below, we want to index our haplo-extracted sample graph # So replace the withref graph IDs and names with these # Find the joined VG files, which always exist joined_vg_ids = join_job.rv('joined') # And give them names - joined_vg_names = [n.replace('_withref', '') for n in joined_vg_names] + rename_job = join_job.addFollowOnJobFn(run_remove_withref, context, joined_vg_names, + cores=context.config.misc_cores, + memory=context.config.misc_mem, + disk=context.config.misc_disk) + joined_vg_names = rename_job.rv() + index_prev_job = rename_job if sample_merge_output_name: # We expect a single output graph too @@ -854,23 +894,37 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, wanted=wanted, gbwt_prune=gbwt_prune and 'gbwt' in wanted, gbwt_regions=gbwt_regions, - dont_restore_paths=alt_regions) + dont_restore_paths=alt_regions, + coalesce_regions=coalesce_regions) indexes = indexing_job.rv() output.append((joined_vg_ids, joined_vg_names, indexes)) return output + +def run_remove_withref(job, context, joined_vg_names): + """ + Return the names in joined_vg_names with '_withref' removed from them. + """ + + return [n.replace('_withref', '') for n in joined_vg_names] def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vcf_names, tbi_ids, max_node_size, alt_paths, flat_alts, handle_svs, regions, region_names, - sort_ids, join_ids, name, merge_output_name, normalize, validate, alt_regions_id): + sort_ids, join_ids, name, merge_output_name, normalize, validate, + alt_regions_id, coalesce_regions=[]): """ Construct graphs from one or more FASTA files and zero or more VCFs. If regions and region_names are set, constructs only for the specified - regions, and constructs one graph per region. Otherwise, constructs one - graph overall on a single default region. + regions, and constructs one graph per region (except if coalesce_regions is + set; see below). Otherwise, constructs one graph overall on a single + default region. + + If coalesce_regions is set (and alt_regions_id is not), it must be a list + of sets of region names. Each set of region names will be run together as a + single construction job, replacing the individual jobs for those regions. If merge_output_name is set, merges all constructed graphs together and outputs them under that name. Otherwise, outputs each graph constructed @@ -895,8 +949,12 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc work_dir = job.fileStore.getLocalTempDir() if not regions: - regions, region_names = [None], ['genome'] - + regions, region_names = [None], ['genome'] + + if not alt_regions_id: + # Coalesce regions (which we can't yet do if also running MSGA) + regions, region_names = apply_coalesce(regions, region_names=region_names, coalesce_regions=coalesce_regions) + region_graph_ids = [] for i, (region, region_name) in enumerate(zip(regions, region_names)): if not vcf_ids or (len(vcf_ids) > 1 and i >= len(vcf_ids)): @@ -947,8 +1005,8 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc return child_job.addFollowOnJobFn(run_join_graphs, context, region_graph_ids, join_ids, region_names, name, merge_output_name, cores=context.config.construct_cores, - memory=context.config.construct_mem, - disk=context.config.construct_disk).rv() + memory=context.config.xg_index_mem, + disk=context.config.xg_index_disk).rv() def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name, merge_output_name = None): """ @@ -969,6 +1027,8 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name 'joined': a list of the unmerged, id-joined graph file IDs (or the input graph(s) re-uploaded as output files if no joining occurred) + 'joined_names': a list of .vg filenames for those graphs + 'merged': the merged graph file ID, if merging occurred, or the only input graph ID, if there was only one. None otherwise. @@ -982,10 +1042,13 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # Download graph for each region. # To keep command line lengths short we name the files by numbers. region_files = [] + # But track their human-readable names + human_names = [] for number, (region_graph_id, region_name) in enumerate(zip(region_graph_ids, region_names)): region_file = '{}.vg'.format(number) job.fileStore.readGlobalFile(region_graph_id, os.path.join(work_dir, region_file), mutable=True) region_files.append(region_file) + human_names.append(remove_ext(region_name, '.vg') + '.vg') if merge_output_name: merge_output_name = remove_ext(merge_output_name, '.vg') + '.vg' @@ -994,6 +1057,7 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # set to make asking for things with .rv() easier. to_return = { 'joined': [], + 'joined_names': human_names, 'merged': None, 'mapping': None } @@ -1028,7 +1092,8 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name if join_ids and len(region_files) != 1: # If we do all the merging, and we made new joined graphs, write the joined graphs as intermediates - to_return['joined'] = [context.write_intermediate_file(job, os.path.join(work_dir, f)) for f in region_files] + to_return['joined'] = [context.write_intermediate_file(job, os.path.join(work_dir, f), dest) + for f, dest in zip(region_files, human_names)] else: # We can just pass through the existing intermediate files without re-uploading to_return['joined'] = region_graph_ids @@ -1036,7 +1101,10 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # No merging happened, so the id-joined files need to be output files. # We assume they came in as intermediate files, even if we didn't join them. # So we defintiely have to write them. - to_return['joined'] = [context.write_output_file(job, os.path.join(work_dir, f)) for f in region_files] + # And we need to make sure to write them under their assigned + # region-based names, even if we downloaded them to shorter names. + to_return['joined'] = [context.write_output_file(job, os.path.join(work_dir, f), dest) + for f, dest in zip(region_files, human_names)] return to_return @@ -1047,6 +1115,9 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n """ Construct a graph from the vcf for a given region and return its file id. + region may be a FASTA contig id, a set of FASTA contig IDs, or None for + using the whole FASTA. + If is_chrom is set, pass along that fact to the constructor so it doesn't try to pass a region out of the chromosome name. @@ -1075,7 +1146,11 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n if vcf_id: cmd += ['--vcf', os.path.basename(vcf_file)] if region: - cmd += ['--region', region] + if isinstance(region, str): + cmd += ['--region', region] + else: + for fasta_id in region: + cmd += ['--region', fasta_id] if is_chrom: cmd += ['--region-is-chrom'] if max_node_size: @@ -1476,7 +1551,7 @@ def run_make_haplo_thread_graphs(job, context, vg_id, vg_name, output_name, chro # Now combine the two files, adding the paths to the graph vg_with_thread_as_path_path = os.path.join(work_dir, '{}{}_thread_{}_merge.vg'.format(output_name, tag, hap)) logger.info('Creating thread graph {}'.format(vg_with_thread_as_path_path)) - cmd = ['vg', 'combine', base_graph_filename, path_graph_filename] + cmd = ['vg', 'combine', '-c', base_graph_filename, path_graph_filename] with open(vg_with_thread_as_path_path, 'wb') as out_file: context.runner.call(job, cmd, work_dir = work_dir, outfile = out_file) @@ -1677,7 +1752,8 @@ def construct_main(context, options): if options.regions_file: inputRegionsFileID = importer.load(options.regions_file) - alt_regions_id = importer.load(options.alt_regions_bed) if options.alt_regions_bed else None + alt_regions_id = importer.load(options.alt_regions_bed) if options.alt_regions_bed else None + coalesce_regions_id = importer.load(options.coalesce_regions) if options.coalesce_regions else None importer.wait() inputFastaFileIDs = importer.resolve(inputFastaFileIDs) @@ -1686,6 +1762,7 @@ def construct_main(context, options): inputBWAFastaID = importer.resolve(inputBWAFastaID) inputRegionsFileID = importer.resolve(inputRegionsFileID) alt_regions_id = importer.resolve(alt_regions_id) + coalesce_regions_id = importer.resolve(coalesce_regions_id) # We only support one haplotype extraction sample (enforced by validate) despire what CLI implies haplo_extraction_sample = options.haplo_sample if options.haplo_sample else options.sample_graph @@ -1757,7 +1834,8 @@ def construct_main(context, options): regions = options.regions # Preproces chromosome names everywhere to be consistent, - # either mapping from 1-->chr1 etc, or going the other way + # either mapping from 1-->chr1 etc, or going the other way. + # Deduplicate regions that become the same after renaming. if options.add_chr_prefix or options.remove_chr_prefix: cur_job = cur_job.addFollowOnJobFn(run_fix_chrom_names, context, options.add_chr_prefix, @@ -1786,6 +1864,15 @@ def construct_main(context, options): regions, alt_regions = cur_job.rv(0), cur_job.rv(1) else: alt_regions=[] + + if coalesce_regions_id: + cur_job = cur_job.addFollowOnJobFn(run_read_coalesce_list, + context, + coalesce_regions_id) + coalesce_regions = cur_job.rv() + + else: + coalesce_regions=[] # Merge up comma-separated vcfs with bcftools merge cur_job = cur_job.addFollowOnJobFn(run_merge_all_vcfs, context, @@ -1808,7 +1895,7 @@ def construct_main(context, options): filter_samples = filter_samples, min_afs = options.min_af, vcf_subdir = '{}-vcfs'.format(options.out_name) if options.keep_vcfs else None) - + # Construct graphs vcf_job.addFollowOnJobFn(run_construct_all, context, inputFastaFileIDs, inputFastaNames, vcf_job.rv(), @@ -1822,7 +1909,8 @@ def construct_main(context, options): normalize = options.normalize, validate = options.validate, alt_regions_id = alt_regions_id, - alt_regions = alt_regions) + alt_regions = alt_regions, + coalesce_regions = coalesce_regions) if inputBWAFastaID: diff --git a/src/toil_vg/vg_index.py b/src/toil_vg/vg_index.py index 9c396676..bb9aefa5 100755 --- a/src/toil_vg/vg_index.py +++ b/src/toil_vg/vg_index.py @@ -224,7 +224,7 @@ def run_gcsa_prune(job, context, graph_name, input_graph_id, gbwt_id, mapping_id return pruned_graph_id, mapping_id def run_gcsa_prep(job, context, input_graph_ids, - graph_names, index_name, chroms, + graph_names, index_name, chrom_gbwt_ids, node_mapping_id, skip_pruning=False, remove_paths=[]): """ @@ -825,7 +825,9 @@ def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name= def run_id_ranges(job, context, inputGraphFileIDs, graph_names, index_name, chroms): """ Make a file of chrom_name first_id last_id covering the - id ranges of all chromosomes. This is to speed up gam splitting down the road. + id ranges of all chromosomes. This is to speed up gam splitting down the road. + + Chroms is a list of contig names or sets of contig names that correspond to each graph. """ RealtimeLogger.info("Starting id ranges...") @@ -859,6 +861,12 @@ def run_id_range(job, context, graph_id, graph_name, chrom): """ Compute a node id range for a graph (which should be an entire contig/chromosome with contiguous id space -- see vg ids) using vg stats + + Chrom is a contig name or set of contig names we expect to be in the graph. + + If multiple contigs are in the graph, we comma-separate them. + + Returns a tuple of 3 strings: chromosome name, first base number, last base number. """ work_dir = job.fileStore.getLocalTempDir() @@ -869,10 +877,13 @@ def run_id_range(job, context, graph_id, graph_name, chrom): #run vg stats #expect result of form node-id-range first:last command = ['vg', 'stats', '--node-id-range', os.path.basename(graph_filename)] - stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).strip().split() - assert stats_out[0].decode('ascii') == 'node-id-range' + stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).decode('utf-8').strip().split() + assert stats_out[0] == 'node-id-range' first, last = stats_out[1].split(b':') - + + if isinstance(chrom, set): + chrom = ','.join(sorted(chrom)) + return chrom, first, last def run_merge_id_ranges(job, context, id_ranges, index_name): @@ -883,9 +894,9 @@ def run_merge_id_ranges(job, context, id_ranges, index_name): # Where do we put the id ranges tsv? id_range_filename = os.path.join(work_dir, '{}_id_ranges.tsv'.format(index_name)) - with open(id_range_filename, 'wb') as f: + with open(id_range_filename, 'w') as f: for id_range in id_ranges: - f.write('{}\t{}\t{}\n'.format(*id_range).encode()) + f.write('{}\t{}\t{}\n'.format(*id_range)) # Checkpoint index to output store return context.write_output_file(job, id_range_filename) @@ -1113,7 +1124,8 @@ def run_indexing(job, context, inputGraphFileIDs, gbwt_id = None, node_mapping_id = None, wanted = set(), gbwt_prune=False, gbwt_regions=[], - dont_restore_paths=[]): + dont_restore_paths=[], + coalesce_regions=[]): """ Run indexing logic by itself. @@ -1164,9 +1176,18 @@ def run_indexing(job, context, inputGraphFileIDs, skipped. If the 'distance' index is requested, the 'trivial_snarls' and 'xg' indexes - will also be computed. + will also be computed. + + If coalesce_regions is set, it must be a list of sets of 'chroms' region + names. Each set of region names will be expected to be together in a graph + file, instead of in separate graph files. """ + + # Coalesce the chroms, so we have some sets of chroms that live in the same + # graph file. + chroms, chrom_names = apply_coalesce(chroms, coalesce_regions=coalesce_regions) + # Make a master child job child_job = Job() job.addChild(child_job) @@ -1182,14 +1203,15 @@ def run_indexing(job, context, inputGraphFileIDs, RealtimeLogger.info("Running indexing: {}.".format({ 'graph_names': graph_names, 'index_name': index_name, - 'chroms': chroms, + 'chroms': chroms if len(chroms) < 100 else f'{len(chroms)} items', 'vcf_phasing_file_ids': vcf_phasing_file_ids, 'tbi_phasing_file_ids': tbi_phasing_file_ids, 'gbwt_id': gbwt_id, 'node_mapping_id': node_mapping_id, 'wanted': wanted, 'gbwt_prune': gbwt_prune, - 'bwa_fasta_id': bwa_fasta_id + 'bwa_fasta_id': bwa_fasta_id, + 'coalesce_regions': coalesce_regions if max([len(x) for x in coalesce_regions] + [0]) < 100 else '(many)' })) # This will hold the index to return @@ -1281,7 +1303,7 @@ def run_indexing(job, context, inputGraphFileIDs, # don't put them in the outstore, unless we're only doing one contig. xg_chrom_index_job = chrom_xg_root_job.addChildJobFn(run_cat_xg_indexing, context, [inputGraphFileIDs[i]], - [graph_names[i]], chrom, + [graph_names[i]], chrom_names[i], vcf_id, tbi_id, make_gbwt=('gbwt' in wanted), gbwt_regions=gbwt_regions, intermediate=(len(chroms) > 1), @@ -1340,7 +1362,7 @@ def run_indexing(job, context, inputGraphFileIDs, # We lack per-chromosome GBWTs but we have a whole genome one we can use indexes['chrom_gbwt'] = indexes['gbwt'] * len(inputGraphFileIDs) gcsa_job = gcsa_root_job.addChildJobFn(run_gcsa_prep, context, inputGraphFileIDs, - graph_names, index_name, chroms, + graph_names, index_name, indexes.get('chrom_gbwt', []) if gbwt_prune else [], node_mapping_id, remove_paths=dont_restore_paths, diff --git a/src/toil_vg/vg_sim.py b/src/toil_vg/vg_sim.py index 92e3ee12..63cca8c8 100644 --- a/src/toil_vg/vg_sim.py +++ b/src/toil_vg/vg_sim.py @@ -280,8 +280,9 @@ def run_sim_chunk(job, context, gam, seed_base, xg_file_id, xg_annot_file_id, nu try: context.runner.call(job, cmd, work_dir = work_dir, outfile=output_json) if validate: - context.runner.call(job, ['vg', 'validate', '--xg', os.path.basename(xg_file), - '--gam', os.path.basename(gam_file)], work_dir = work_dir) + context.runner.call(job, ['vg', 'validate', + '--gam', os.path.basename(gam_file), + os.path.basename(xg_file)], work_dir = work_dir) except: # Dump everything we need to replicate the problem context.write_output_file(job, xg_file)