From 4fc82c8c6295e46869742271e2c2a6520ec8f330 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 12 Mar 2021 17:07:07 -0800 Subject: [PATCH 01/14] Add a coalesce feature that needs a test --- src/toil_vg/vg_construct.py | 108 ++++++++++++++++++++++++++++++++---- 1 file changed, 97 insertions(+), 11 deletions(-) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index d693f799..6aa0de37 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -479,7 +479,26 @@ def run_subtract_alt_regions(job, context, alt_regions_id, regions): alt_regions.add(toks[3]) return [region for region in regions if region not in alt_regions], list(alt_regions) - + +def run_read_coalesce_list(job, context, coalesce_regions_id): + """ + Read the given input file. + Produce a list of sets of region/contig names to coalesce into single jobs. + Treats the input file as tab-separated, one set per line. + """ + + work_dir = job.fileStore.getLocalTempDir() + coalesce_regions_path = os.path.join(work_dir, 'coalesce-regions.tsv') + job.fileStore.readGlobalFile(coalesce_regions_id, coalesce_regions_path) + coalesce_regions = [] + with open(coalesce_regions_path) as in_regions: + for line in in_regions: + toks = line.strip().split('\t') + if len(toks) > 0 and toks[0] != '' and not toks[0][0] != '#': + coalesce_regions.append(set(toks)) + + return coalesce_regions + def run_generate_input_vcfs(job, context, vcf_ids, vcf_names, tbi_ids, regions, output_name, do_primary = False, @@ -692,7 +711,7 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, wanted_indexes = set(), haplo_extraction_sample = None, haplotypes = [0,1], gbwt_prune = False, normalize = False, validate = False, alt_regions_id = None, - alt_regions = []): + alt_regions = [], coalesce_regions = []): """ construct many graphs in parallel, optionally doing indexing too. vcf_inputs is a list of tuples as created by run_generate_input_vcfs @@ -700,6 +719,10 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, Returns a list of tuples of the form (vg_ids, vg_names, indexes), where indexes is the index dict from index type to file ID. + If coalesce_regions is set (and alt_regions is not), it must be a list of + sets of region names. Each set of region names will be run together as a + single construction job, replacing the individual jobs for those regions. + """ output = [] @@ -714,7 +737,8 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, max_node_size, ('gbwt' in wanted_indexes) or haplo_extraction or alt_paths, flat_alts, handle_svs, regions, region_names, sort_ids, join_ids, name, merge_output_name, - normalize and name != 'haplo', validate, alt_regions_id) + normalize and name != 'haplo', validate, alt_regions_id, + coalesce_regions=coalesce_regions) mapping_id = construct_job.rv('mapping') @@ -863,14 +887,20 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vcf_names, tbi_ids, max_node_size, alt_paths, flat_alts, handle_svs, regions, region_names, - sort_ids, join_ids, name, merge_output_name, normalize, validate, alt_regions_id): + sort_ids, join_ids, name, merge_output_name, normalize, validate, + alt_regions_id, coalesce_regions=[]): """ Construct graphs from one or more FASTA files and zero or more VCFs. If regions and region_names are set, constructs only for the specified - regions, and constructs one graph per region. Otherwise, constructs one - graph overall on a single default region. + regions, and constructs one graph per region (except if coalesce_regions is + set; see below). Otherwise, constructs one graph overall on a single + default region. + + If coalesce_regions is set (and alt_regions_id is not), it must be a list + of sets of region names. Each set of region names will be run together as a + single construction job, replacing the individual jobs for those regions. If merge_output_name is set, merges all constructed graphs together and outputs them under that name. Otherwise, outputs each graph constructed @@ -895,7 +925,44 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc work_dir = job.fileStore.getLocalTempDir() if not regions: - regions, region_names = [None], ['genome'] + regions, region_names = [None], ['genome'] + + + if not alt_regions_id: + # Coalesce regions (which we can't yet do if also runnign MSGA) + wanted_regions = set(regions) + # These will replace regions and region_names if we coalesce away regions + coalesced_regions = [] + coalesced_names = [] + coalesced_away = set() + + for to_coalesce in coalesce_regions: + # Find out if we have all the regions that need to be coalesced here. + have_all = True + for region_name in to_coalesce: + if region_name not in wanted_regions: + have_all = False + break + if have_all: + # Use this coalescing + coalesced_regions.append(to_coalesce) + coalesced_names.append("coalesced{}".format(len(coalesced_regions))) + # And skip these regions + coalesced_away += to_coalesce + + if len(coalesced_away) > 0: + # Drop the coalesced regions from regions + remaining_regions = [] + remaining_names = [] + for i in range(len(regions)): + if regions[i] not in coalesced_away: + remaining_regions.append(regions[i]) + remaining_names.append(region_names[i]) + # And replace the original regions with the coalesced ones, and the + # remaining uncoalesced regions. Put the remaining ones first because + # they are probably big chromosomes and may have associated VCFs. + regions = remaining_regions + coalesced_regions + region_names = remaining_names + coalesced_names region_graph_ids = [] for i, (region, region_name) in enumerate(zip(regions, region_names)): @@ -1047,6 +1114,9 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n """ Construct a graph from the vcf for a given region and return its file id. + region may be a FASTA contig id, a set of FASTA contig IDs, or None for + using the whole FASTA. + If is_chrom is set, pass along that fact to the constructor so it doesn't try to pass a region out of the chromosome name. @@ -1075,7 +1145,11 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n if vcf_id: cmd += ['--vcf', os.path.basename(vcf_file)] if region: - cmd += ['--region', region] + if isinstance(region, str): + cmd += ['--region', region] + else: + for fasta_id in region: + cmd += ['--region', fasta_id] if is_chrom: cmd += ['--region-is-chrom'] if max_node_size: @@ -1677,7 +1751,8 @@ def construct_main(context, options): if options.regions_file: inputRegionsFileID = importer.load(options.regions_file) - alt_regions_id = importer.load(options.alt_regions_bed) if options.alt_regions_bed else None + alt_regions_id = importer.load(options.alt_regions_bed) if options.alt_regions_bed else None + coalesce_contigs_id = importer.load(options.coalesce_contigs) if options.coalesce_contigs else None importer.wait() inputFastaFileIDs = importer.resolve(inputFastaFileIDs) @@ -1686,6 +1761,7 @@ def construct_main(context, options): inputBWAFastaID = importer.resolve(inputBWAFastaID) inputRegionsFileID = importer.resolve(inputRegionsFileID) alt_regions_id = importer.resolve(alt_regions_id) + coalesce_contigs_id = importer.resolve(coalesce_contigs_id) # We only support one haplotype extraction sample (enforced by validate) despire what CLI implies haplo_extraction_sample = options.haplo_sample if options.haplo_sample else options.sample_graph @@ -1786,6 +1862,15 @@ def construct_main(context, options): regions, alt_regions = cur_job.rv(0), cur_job.rv(1) else: alt_regions=[] + + if coalesce_contigs_id: + cur_job = cur_job.addFollowOnJob(run_read_coalesce_list, + context, + coalesce_contigs_id) + coalesce_contigs = cur_job.rv() + + else: + coalesce_contigs=[] # Merge up comma-separated vcfs with bcftools merge cur_job = cur_job.addFollowOnJobFn(run_merge_all_vcfs, context, @@ -1808,7 +1893,7 @@ def construct_main(context, options): filter_samples = filter_samples, min_afs = options.min_af, vcf_subdir = '{}-vcfs'.format(options.out_name) if options.keep_vcfs else None) - + # Construct graphs vcf_job.addFollowOnJobFn(run_construct_all, context, inputFastaFileIDs, inputFastaNames, vcf_job.rv(), @@ -1822,7 +1907,8 @@ def construct_main(context, options): normalize = options.normalize, validate = options.validate, alt_regions_id = alt_regions_id, - alt_regions = alt_regions) + alt_regions = alt_regions + coalesce_contigs = coalesce_contigs) if inputBWAFastaID: From f456e816a22bb454724fa9633477904bf8d2e923 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 15 Mar 2021 11:31:08 -0700 Subject: [PATCH 02/14] Make test for region coalescing and add some informative logging --- src/toil_vg/test/test_vg.py | 48 ++++++++++++++++++++++++++++++++++--- src/toil_vg/vg_construct.py | 29 +++++++++++++++------- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 9b8dbf5a..012330b8 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -54,7 +54,7 @@ def _download_input(self, filename, local_path = None): def setUp(self): # Set this to True to poke around in the outsores for debug purposes - self.saveWorkDir = False + self.saveWorkDir = True self.workdir = './toil-vgci_work' if self.saveWorkDir else tempfile.mkdtemp() if not os.path.exists(self.workdir): os.makedirs(self.workdir) @@ -404,7 +404,7 @@ def test_06_BRCA1_NA12877(self): self._assertOutput('NA12877', self.local_outstore, f1_threshold=0.45) def test_07_BRCA1_BRCA2_NA12877(self): - ''' Test pipeline on chase with two chromosomes, in this case both BRCA regions + ''' Test pipeline on case with two chromosomes, in this case both BRCA regions ''' self._download_input('NA12877.brca1.brca2.bam.fq.gz') self._download_input('snp1kg-brca1.vg') @@ -595,7 +595,49 @@ def _test_09_construct(self, container_override): if prev_vg_size is not None: assert vg_size <= prev_vg_size prev_vg_size = vg_size - + + def test_10_construct_multiple_contigs(self): + ''' Test ability to group construction jobs + ''' + self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz.tbi') + self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz') + self._download_input('BRCA1_BRCA2.fa.gz') + + in_vcf = self._ci_input_path('platinum_NA12877_BRCA1_BRCA2.vcf.gz') + in_tbi = in_vcf + '.tbi' + in_fa = self._ci_input_path('BRCA1_BRCA2.fa.gz') + out_name = 'allbrca' + + print("Construct to " + self.local_outstore) + + command = ['toil-vg', 'construct', self.jobStoreLocal, self.local_outstore, + '--container', self.containerType, + '--clean', 'never', + '--fasta', in_fa, '--vcf', in_vcf, '--regions', '13', '17', '--remove_chr_prefix', + '--out_name', out_name, '--pangenome', '--filter_ceph', '--min_af', '0.01', + '--realTimeLogging', '--logInfo'] + self._run(command) + self._run(['toil', 'clean', self.jobStoreLocal]) + + # Pangenome should leave individual graphs. For some reason they aren't contig named. + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '0.vg'))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '1.vg'))) + # Also individual filtered graphs + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}_filtered_0.vg'.format(out_name)))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}_filtered_1.vg'.format(out_name)))) + + in_coalesce_regions = os.path.join(self.local_outstore, 'coalesce.tsv') + with open(in_coalesce_regions, 'w') as to_coalesce: + to_coalesce.write('13\t17\n') + command += ['--coalesce_regions', in_coalesce_regions] + + self._run(command) + self._run(['toil', 'clean', self.jobStoreLocal]) + + # We need coalesced versions of all the graph types. + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, out_name + '_coalesced0.vg'))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, out_name + '_filtered_coalesced0.vg'))) + def test_11_gbwt(self): ''' Test that the gbwt gets constructed without crashing (but not much beyond that) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index 6aa0de37..c07ef324 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -59,6 +59,8 @@ def construct_subparser(parser): parser.add_argument("--alt_regions_bed", type=make_url, help="BED file mapping alt regions (cols 1-3) to sequence names (col 4) from the FASTA. " "Alt regions will be aligned to the graph using vg msga") + parser.add_argument("--coalesce_regions", type=make_url, + help="File of tab-separated sets of sequence names for batching construction jobs, one per line.") parser.add_argument("--max_node_size", type=int, default=32, help="Maximum node length") parser.add_argument("--alt_paths", action="store_true", @@ -739,6 +741,8 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, region_names, sort_ids, join_ids, name, merge_output_name, normalize and name != 'haplo', validate, alt_regions_id, coalesce_regions=coalesce_regions) + + RealtimeLogger.info('Kick off construction of {}'.format((name, (vcf_ids, vcf_names, tbi_ids, output_name, region_names)))) mapping_id = construct_job.rv('mapping') @@ -926,7 +930,8 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc if not regions: regions, region_names = [None], ['genome'] - + + RealtimeLogger.info('Before coalesce: {}, {}'.format(regions, region_names)) if not alt_regions_id: # Coalesce regions (which we can't yet do if also runnign MSGA) @@ -963,6 +968,8 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc # they are probably big chromosomes and may have associated VCFs. regions = remaining_regions + coalesced_regions region_names = remaining_names + coalesced_names + + RealtimeLogger.info('After coalesce: {}, {}'.format(regions, region_names)) region_graph_ids = [] for i, (region, region_name) in enumerate(zip(regions, region_names)): @@ -1091,6 +1098,7 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name context.runner.call(job, cmd, work_dir=work_dir, outfile = merge_file) # And write the merged graph as an output file + RealtimeLogger.info('Save merged graph: {}'.format(merge_output_name)) to_return['merged'] = context.write_output_file(job, os.path.join(work_dir, merge_output_name)) if join_ids and len(region_files) != 1: @@ -1103,6 +1111,7 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # No merging happened, so the id-joined files need to be output files. # We assume they came in as intermediate files, even if we didn't join them. # So we defintiely have to write them. + RealtimeLogger.info('Save id-merged graphs: {}'.format(region_files)) to_return['joined'] = [context.write_output_file(job, os.path.join(work_dir, f)) for f in region_files] return to_return @@ -1141,6 +1150,8 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n job.fileStore.readGlobalFile(vcf_id, vcf_file) job.fileStore.readGlobalFile(tbi_id, vcf_file + '.tbi') + RealtimeLogger.info('Construct region graph for {}'.format(region_name)) + cmd = ['vg', 'construct', '--reference', os.path.basename(fasta_file)] if vcf_id: cmd += ['--vcf', os.path.basename(vcf_file)] @@ -1752,7 +1763,7 @@ def construct_main(context, options): inputRegionsFileID = importer.load(options.regions_file) alt_regions_id = importer.load(options.alt_regions_bed) if options.alt_regions_bed else None - coalesce_contigs_id = importer.load(options.coalesce_contigs) if options.coalesce_contigs else None + coalesce_regions_id = importer.load(options.coalesce_regions) if options.coalesce_regions else None importer.wait() inputFastaFileIDs = importer.resolve(inputFastaFileIDs) @@ -1761,7 +1772,7 @@ def construct_main(context, options): inputBWAFastaID = importer.resolve(inputBWAFastaID) inputRegionsFileID = importer.resolve(inputRegionsFileID) alt_regions_id = importer.resolve(alt_regions_id) - coalesce_contigs_id = importer.resolve(coalesce_contigs_id) + coalesce_regions_id = importer.resolve(coalesce_regions_id) # We only support one haplotype extraction sample (enforced by validate) despire what CLI implies haplo_extraction_sample = options.haplo_sample if options.haplo_sample else options.sample_graph @@ -1863,14 +1874,14 @@ def construct_main(context, options): else: alt_regions=[] - if coalesce_contigs_id: + if coalesce_regions_id: cur_job = cur_job.addFollowOnJob(run_read_coalesce_list, context, - coalesce_contigs_id) - coalesce_contigs = cur_job.rv() + coalesce_regions_id) + coalesce_regions = cur_job.rv() else: - coalesce_contigs=[] + coalesce_regions=[] # Merge up comma-separated vcfs with bcftools merge cur_job = cur_job.addFollowOnJobFn(run_merge_all_vcfs, context, @@ -1907,8 +1918,8 @@ def construct_main(context, options): normalize = options.normalize, validate = options.validate, alt_regions_id = alt_regions_id, - alt_regions = alt_regions - coalesce_contigs = coalesce_contigs) + alt_regions = alt_regions, + coalesce_regions = coalesce_regions) if inputBWAFastaID: From e4cf982b598215e991f6e30d8a5917b72eb0d525 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 15 Mar 2021 11:53:10 -0700 Subject: [PATCH 03/14] Get coalescing test to pass --- src/toil_vg/context.py | 7 ++++--- src/toil_vg/test/test_vg.py | 20 ++++++++++---------- src/toil_vg/vg_construct.py | 23 ++++++++++++----------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/toil_vg/context.py b/src/toil_vg/context.py index fc6fcd8c..b2a629b2 100644 --- a/src/toil_vg/context.py +++ b/src/toil_vg/context.py @@ -114,22 +114,23 @@ def get_out_store(self): else: return None - def write_intermediate_file(self, job, path): + def write_intermediate_file(self, job, path, out_store_path = None): """ Write the file at the given path to the given job's Toil FileStore, and to the out_store if one is in use and we are trying to dump intermediate files. In the out_store, the file is saved under its basename, in the root - directory. + directory. If out_store_path is set, the file is saved there instead. Returns the Toil file ID for the written file. """ out_store = self.get_out_store() if out_store is not None and self.config.force_outstore: + name = out_store_path if out_store_path else os.path.basename(path) # Save to the out_store if it exists - out_store.write_output_file(path, os.path.basename(path)) + out_store.write_output_file(path, name) # Save to Toil return job.fileStore.writeGlobalFile(path) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 012330b8..d16ac44c 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -619,12 +619,12 @@ def test_10_construct_multiple_contigs(self): self._run(command) self._run(['toil', 'clean', self.jobStoreLocal]) - # Pangenome should leave individual graphs. For some reason they aren't contig named. - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '0.vg'))) - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '1.vg'))) - # Also individual filtered graphs - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}_filtered_0.vg'.format(out_name)))) - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}_filtered_1.vg'.format(out_name)))) + for middle in ['_', '_filter_', '_minaf_0.01_']: + # Should leave individual graphs, named sensibly. + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}13.vg'.format(out_name, middle)))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}17.vg'.format(out_name, middle)))) + # Should not leave a coalesced region + self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) in_coalesce_regions = os.path.join(self.local_outstore, 'coalesce.tsv') with open(in_coalesce_regions, 'w') as to_coalesce: @@ -634,10 +634,10 @@ def test_10_construct_multiple_contigs(self): self._run(command) self._run(['toil', 'clean', self.jobStoreLocal]) - # We need coalesced versions of all the graph types. - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, out_name + '_coalesced0.vg'))) - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, out_name + '_filtered_coalesced0.vg'))) - + for middle in ['_', '_filter_', '_minaf_0.01_']: + # Should now leave a coalesced region + self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) + def test_11_gbwt(self): ''' Test that the gbwt gets constructed without crashing (but not much beyond that) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index c07ef324..037a0bb9 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -742,8 +742,6 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, normalize and name != 'haplo', validate, alt_regions_id, coalesce_regions=coalesce_regions) - RealtimeLogger.info('Kick off construction of {}'.format((name, (vcf_ids, vcf_names, tbi_ids, output_name, region_names)))) - mapping_id = construct_job.rv('mapping') # Find the joined VG files, which always exist @@ -1056,10 +1054,13 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # Download graph for each region. # To keep command line lengths short we name the files by numbers. region_files = [] + # But track their human-readable names + human_names = [] for number, (region_graph_id, region_name) in enumerate(zip(region_graph_ids, region_names)): region_file = '{}.vg'.format(number) job.fileStore.readGlobalFile(region_graph_id, os.path.join(work_dir, region_file), mutable=True) region_files.append(region_file) + human_names.append(remove_ext(region_name, '.vg') + '.vg') if merge_output_name: merge_output_name = remove_ext(merge_output_name, '.vg') + '.vg' @@ -1098,12 +1099,12 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name context.runner.call(job, cmd, work_dir=work_dir, outfile = merge_file) # And write the merged graph as an output file - RealtimeLogger.info('Save merged graph: {}'.format(merge_output_name)) to_return['merged'] = context.write_output_file(job, os.path.join(work_dir, merge_output_name)) if join_ids and len(region_files) != 1: # If we do all the merging, and we made new joined graphs, write the joined graphs as intermediates - to_return['joined'] = [context.write_intermediate_file(job, os.path.join(work_dir, f)) for f in region_files] + to_return['joined'] = [context.write_intermediate_file(job, os.path.join(work_dir, f), dest) + for f, dest in zip(region_files, human_names)] else: # We can just pass through the existing intermediate files without re-uploading to_return['joined'] = region_graph_ids @@ -1111,8 +1112,10 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # No merging happened, so the id-joined files need to be output files. # We assume they came in as intermediate files, even if we didn't join them. # So we defintiely have to write them. - RealtimeLogger.info('Save id-merged graphs: {}'.format(region_files)) - to_return['joined'] = [context.write_output_file(job, os.path.join(work_dir, f)) for f in region_files] + # And we need to make sure to write them under their assigned + # region-based names, even if we downloaded them to shorter names. + to_return['joined'] = [context.write_output_file(job, os.path.join(work_dir, f), dest) + for f, dest in zip(region_files, human_names)] return to_return @@ -1150,8 +1153,6 @@ def run_construct_region_graph(job, context, fasta_id, fasta_name, vcf_id, vcf_n job.fileStore.readGlobalFile(vcf_id, vcf_file) job.fileStore.readGlobalFile(tbi_id, vcf_file + '.tbi') - RealtimeLogger.info('Construct region graph for {}'.format(region_name)) - cmd = ['vg', 'construct', '--reference', os.path.basename(fasta_file)] if vcf_id: cmd += ['--vcf', os.path.basename(vcf_file)] @@ -1875,9 +1876,9 @@ def construct_main(context, options): alt_regions=[] if coalesce_regions_id: - cur_job = cur_job.addFollowOnJob(run_read_coalesce_list, - context, - coalesce_regions_id) + cur_job = cur_job.addFollowOnJobFn(run_read_coalesce_list, + context, + coalesce_regions_id) coalesce_regions = cur_job.rv() else: From 50d64f9b0f6e3ce0f7e83af99425f607ea8b3e62 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 15 Mar 2021 11:55:04 -0700 Subject: [PATCH 04/14] Fix test to test for actual coalescing --- src/toil_vg/test/test_vg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index d16ac44c..2fabe08e 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -636,7 +636,7 @@ def test_10_construct_multiple_contigs(self): for middle in ['_', '_filter_', '_minaf_0.01_']: # Should now leave a coalesced region - self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) def test_11_gbwt(self): ''' From 65939d89899046988d316149d641ebd3e95d1f38 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 15 Mar 2021 12:55:50 -0700 Subject: [PATCH 05/14] Do some indexing in the test and use a vg that actually handles multiple region flags --- src/toil_vg/test/test_vg.py | 8 ++++++-- src/toil_vg/vg_config.py | 4 ++-- src/toil_vg/vg_construct.py | 33 ++++++++++++++++++--------------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 2fabe08e..7eaccc64 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -599,6 +599,8 @@ def _test_09_construct(self, container_override): def test_10_construct_multiple_contigs(self): ''' Test ability to group construction jobs ''' + + # This has no phasings so we can't build a GBWT. self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz.tbi') self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz') self._download_input('BRCA1_BRCA2.fa.gz') @@ -613,8 +615,10 @@ def test_10_construct_multiple_contigs(self): command = ['toil-vg', 'construct', self.jobStoreLocal, self.local_outstore, '--container', self.containerType, '--clean', 'never', - '--fasta', in_fa, '--vcf', in_vcf, '--regions', '13', '17', '--remove_chr_prefix', + '--fasta', in_fa, '--vcf', in_vcf, '--vcf_phasing', in_vcf, + '--regions', '13', '17', '--remove_chr_prefix', '--out_name', out_name, '--pangenome', '--filter_ceph', '--min_af', '0.01', + '--xg_index', '--realTimeLogging', '--logInfo'] self._run(command) self._run(['toil', 'clean', self.jobStoreLocal]) @@ -624,7 +628,7 @@ def test_10_construct_multiple_contigs(self): self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}13.vg'.format(out_name, middle)))) self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}17.vg'.format(out_name, middle)))) # Should not leave a coalesced region - self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) + self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesced0.vg'.format(out_name, middle)))) in_coalesce_regions = os.path.join(self.local_outstore, 'coalesce.tsv') with open(in_coalesce_regions, 'w') as to_coalesce: diff --git a/src/toil_vg/vg_config.py b/src/toil_vg/vg_config.py index 89a2ea88..fa0c9a09 100644 --- a/src/toil_vg/vg_config.py +++ b/src/toil_vg/vg_config.py @@ -156,7 +156,7 @@ ## of through docker. # Docker image to use for vg -vg-docker: 'quay.io/vgteam/vg:v1.28.0' +vg-docker: 'quay.io/vgteam/vg:ci-2850-58eed1cb13f9444849933685a7f51a8dc9273ca6' # Docker image to use for bcftools bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0' @@ -463,7 +463,7 @@ ## of through docker. # Docker image to use for vg -vg-docker: 'quay.io/vgteam/vg:v1.24.0' +vg-docker: 'quay.io/vgteam/vg:ci-2850-58eed1cb13f9444849933685a7f51a8dc9273ca6' # Docker image to use for bcftools bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0' diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index 037a0bb9..569a1912 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -496,7 +496,7 @@ def run_read_coalesce_list(job, context, coalesce_regions_id): with open(coalesce_regions_path) as in_regions: for line in in_regions: toks = line.strip().split('\t') - if len(toks) > 0 and toks[0] != '' and not toks[0][0] != '#': + if len(toks) > 0 and toks[0] != '' and toks[0][0] != '#': coalesce_regions.append(set(toks)) return coalesce_regions @@ -928,31 +928,36 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc if not regions: regions, region_names = [None], ['genome'] - - RealtimeLogger.info('Before coalesce: {}, {}'.format(regions, region_names)) - + if not alt_regions_id: - # Coalesce regions (which we can't yet do if also runnign MSGA) + # Coalesce regions (which we can't yet do if also running MSGA) wanted_regions = set(regions) + # We need to fake the output names for the coalesced regions based on + # the original ones. So map from region to region name. + region_to_name = dict(zip(regions, region_names)) # These will replace regions and region_names if we coalesce away regions coalesced_regions = [] coalesced_names = [] coalesced_away = set() - + for to_coalesce in coalesce_regions: # Find out if we have all the regions that need to be coalesced here. have_all = True - for region_name in to_coalesce: - if region_name not in wanted_regions: - have_all = False - break + for region in to_coalesce: + if region not in wanted_regions: + have_all = False + break if have_all: # Use this coalescing coalesced_regions.append(to_coalesce) - coalesced_names.append("coalesced{}".format(len(coalesced_regions))) + + # Try and replace the region in its name, if possible, when naming the coalesced region. + region_in_name = region_to_name[region].rfind(region) + base_name = region_to_name[region][:region_in_name] if region_in_name != -1 else region_to_name[region] + coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions - 1))) # And skip these regions - coalesced_away += to_coalesce - + coalesced_away.update(to_coalesce) + if len(coalesced_away) > 0: # Drop the coalesced regions from regions remaining_regions = [] @@ -967,8 +972,6 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc regions = remaining_regions + coalesced_regions region_names = remaining_names + coalesced_names - RealtimeLogger.info('After coalesce: {}, {}'.format(regions, region_names)) - region_graph_ids = [] for i, (region, region_name) in enumerate(zip(regions, region_names)): if not vcf_ids or (len(vcf_ids) > 1 and i >= len(vcf_ids)): From cff8282a289f53be8675277a1bcfe8a6ee35203a Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 15 Mar 2021 13:29:27 -0700 Subject: [PATCH 06/14] Check for coalesced graphs under the right names --- src/toil_vg/test/test_vg.py | 4 ++-- src/toil_vg/vg_construct.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 7eaccc64..53acce0d 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -54,7 +54,7 @@ def _download_input(self, filename, local_path = None): def setUp(self): # Set this to True to poke around in the outsores for debug purposes - self.saveWorkDir = True + self.saveWorkDir = False self.workdir = './toil-vgci_work' if self.saveWorkDir else tempfile.mkdtemp() if not os.path.exists(self.workdir): os.makedirs(self.workdir) @@ -640,7 +640,7 @@ def test_10_construct_multiple_contigs(self): for middle in ['_', '_filter_', '_minaf_0.01_']: # Should now leave a coalesced region - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesce0.vg'.format(out_name, middle)))) + self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesced0.vg'.format(out_name, middle)))) def test_11_gbwt(self): ''' diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index 569a1912..1ca13272 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -954,7 +954,7 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc # Try and replace the region in its name, if possible, when naming the coalesced region. region_in_name = region_to_name[region].rfind(region) base_name = region_to_name[region][:region_in_name] if region_in_name != -1 else region_to_name[region] - coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions - 1))) + coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions) - 1)) # And skip these regions coalesced_away.update(to_coalesce) From d7c8160f11f6efd0da1f8dd01ceb968bd48f1d0d Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 17 Mar 2021 10:58:31 -0700 Subject: [PATCH 07/14] Deduplicate regions that become the same after renaming --- src/toil_vg/vg_construct.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index 1ca13272..edb1cda3 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -360,15 +360,23 @@ def run_fix_chrom_names(job, context, to_ucsc, regions, fasta_ids, fasta_names, out_regions = [] something_to_rename = False - # map the regions + # Map the regions. + # Some regions may map to the same region as a previous region if e.g. we + # fetched region names out of the FASTA. So deduplicate here. + done_regions = set() for region in regions: region_name = region.split(':')[0] if region_name in name_map: something_to_rename = True - out_regions.append(name_map[region_name] + region[len(region_name):]) + renamed_region = name_map[region_name] + region[len(region_name):] + if renamed_region not in done_regions: + out_regions.append(renamed_region) + done_regions.add(renamed_region) else: something_to_rename = something_to_rename or region_name in list(name_map.values()) - out_regions.append(region) + if region not in done_regions: + out_regions.append(region) + done_regions.add(region) # map the vcf out_vcf_ids = [] @@ -769,6 +777,8 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, # Get the chromosome names chroms = [p.split(':')[0] for p in regions] + RealtimeLogger.info('Operating on chromosomes: %s', chroms) + # Make sure we have no more than 1 region per chromosome. # Otherwise GBWT region restriction will mess things up. assert(len(chroms) == len(set(chroms))) From 0ac9c9f3bb3cfa001a64b5de7c0e74065e8970ef Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 19 Mar 2021 10:21:41 -0700 Subject: [PATCH 08/14] Make sure graph combining has enough memory and disk to store whole genome graphs --- src/toil_vg/vg_construct.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index edb1cda3..a56b84bd 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -824,8 +824,8 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, join_job = sample_job.addFollowOnJobFn(run_join_graphs, context, sample_job.rv(), False, region_names, name, sample_merge_output_name, cores=context.config.construct_cores, - memory=context.config.construct_mem, - disk=context.config.construct_disk) + memory=context.config.xg_index_mem, + disk=context.config.xg_index_disk) # Want to keep a whole-genome withref xg index around for mapeval purposes if len(regions) > 1 and ('xg' in wanted_indexes): @@ -1032,8 +1032,8 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc return child_job.addFollowOnJobFn(run_join_graphs, context, region_graph_ids, join_ids, region_names, name, merge_output_name, cores=context.config.construct_cores, - memory=context.config.construct_mem, - disk=context.config.construct_disk).rv() + memory=context.config.xg_index_mem, + disk=context.config.xg_index_disk).rv() def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name, merge_output_name = None): """ @@ -1858,7 +1858,8 @@ def construct_main(context, options): regions = options.regions # Preproces chromosome names everywhere to be consistent, - # either mapping from 1-->chr1 etc, or going the other way + # either mapping from 1-->chr1 etc, or going the other way. + # Deduplicate regions that become the same after renaming. if options.add_chr_prefix or options.remove_chr_prefix: cur_job = cur_job.addFollowOnJobFn(run_fix_chrom_names, context, options.add_chr_prefix, From d9ef5559024f200f8f9f82d62147efb449754e55 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 19 Mar 2021 17:11:23 -0700 Subject: [PATCH 09/14] Make tests replicate the indexing error we see in the full-scale GRCh38 build --- src/toil_vg/test/test_vg.py | 40 ++++++++++++------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/src/toil_vg/test/test_vg.py b/src/toil_vg/test/test_vg.py index 53acce0d..9d644c3e 100644 --- a/src/toil_vg/test/test_vg.py +++ b/src/toil_vg/test/test_vg.py @@ -597,43 +597,29 @@ def _test_09_construct(self, container_override): prev_vg_size = vg_size def test_10_construct_multiple_contigs(self): - ''' Test ability to group construction jobs + ''' Test ability to group construction jobs, in the chape of GRCh38 ''' - # This has no phasings so we can't build a GBWT. - self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz.tbi') - self._download_input('platinum_NA12877_BRCA1_BRCA2.vcf.gz') - self._download_input('BRCA1_BRCA2.fa.gz') + chrom_names = [f'{x}' for x in range(1,23)] + ['X', 'Y'] + vcf_bases = [f'chr{x}' for x in chrom_names] + ['others'] + region_names = chrom_names + ['chrM'] - in_vcf = self._ci_input_path('platinum_NA12877_BRCA1_BRCA2.vcf.gz') - in_tbi = in_vcf + '.tbi' - in_fa = self._ci_input_path('BRCA1_BRCA2.fa.gz') - out_name = 'allbrca' + in_vcfs = [self._ci_input_path(f'GRCh38.1000gp.fake.{vcf_base}.vcf.gz') for vcf_base in vcf_bases] + in_tbis = [in_vcf + '.tbi' for in_vcf in in_vcfs] + in_fa = self._ci_input_path('GRCh38.fake.fa') + in_coalesce_regions = self._ci_input_path('GRCh38.1000gp.fake.minor_contigs.tsv') + out_name = 'Fake1000GP' print("Construct to " + self.local_outstore) command = ['toil-vg', 'construct', self.jobStoreLocal, self.local_outstore, '--container', self.containerType, '--clean', 'never', - '--fasta', in_fa, '--vcf', in_vcf, '--vcf_phasing', in_vcf, - '--regions', '13', '17', '--remove_chr_prefix', + '--fasta', in_fa, '--vcf'] + in_vcfs + ['--vcf_phasing'] + in_vcfs + [ + '--regions'] + region_names + ['--fasta_regions', '--remove_chr_prefix', '--out_name', out_name, '--pangenome', '--filter_ceph', '--min_af', '0.01', - '--xg_index', - '--realTimeLogging', '--logInfo'] - self._run(command) - self._run(['toil', 'clean', self.jobStoreLocal]) - - for middle in ['_', '_filter_', '_minaf_0.01_']: - # Should leave individual graphs, named sensibly. - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}13.vg'.format(out_name, middle)))) - self.assertTrue(os.path.isfile(os.path.join(self.local_outstore, '{}{}17.vg'.format(out_name, middle)))) - # Should not leave a coalesced region - self.assertFalse(os.path.isfile(os.path.join(self.local_outstore, '{}{}coalesced0.vg'.format(out_name, middle)))) - - in_coalesce_regions = os.path.join(self.local_outstore, 'coalesce.tsv') - with open(in_coalesce_regions, 'w') as to_coalesce: - to_coalesce.write('13\t17\n') - command += ['--coalesce_regions', in_coalesce_regions] + '--all_index', + '--realTimeLogging', '--logInfo', '--coalesce_regions', in_coalesce_regions] self._run(command) self._run(['toil', 'clean', self.jobStoreLocal]) From 68509dda2fed5212be00740394f11d96226c169c Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 19 Mar 2021 18:17:52 -0700 Subject: [PATCH 10/14] Make construct job responsible for picking names --- src/toil_vg/vg_common.py | 63 +++++++++++++++++++++++++++++++++++ src/toil_vg/vg_construct.py | 66 ++++++++++++------------------------- src/toil_vg/vg_index.py | 38 ++++++++++++++++----- 3 files changed, 113 insertions(+), 54 deletions(-) diff --git a/src/toil_vg/vg_common.py b/src/toil_vg/vg_common.py index 32d214e8..1881d40d 100644 --- a/src/toil_vg/vg_common.py +++ b/src/toil_vg/vg_common.py @@ -1087,3 +1087,66 @@ def resolve(self, result): return result.result() else: return result + + +def apply_coalesce(regions, region_names=None, coalesce_regions=[]): + """ + Given a list of regions, and a list of sets of regions to coalesce if all + are present, produce a list of regions or sets of regions, preserving the + original region order among the non-coalesced regions, with the sets at the + end. + + Also takes a list of region names. If not set, the regions themselves are + used. + + If all the regions in a set are present, they will all be pulled out of the + normal ordering and put in that set at the end. + + Returns (coalesced regions, coalesced region names) + """ + + if not region_names: + region_names = regions + + wanted_regions = set(regions) + # We need to fake the output names for the coalesced regions based on + # the original ones. So map from region to region name. + region_to_name = dict(zip(regions, region_names)) + # These will replace regions and region_names if we coalesce away regions + coalesced_regions = [] + coalesced_names = [] + coalesced_away = set() + + for to_coalesce in coalesce_regions: + # Find out if we have all the regions that need to be coalesced here. + have_all = True + for region in to_coalesce: + if region not in wanted_regions: + have_all = False + break + if have_all: + # Use this coalescing + coalesced_regions.append(to_coalesce) + + # Try and replace the region in its name, if possible, when naming the coalesced region. + region_in_name = region_to_name[region].rfind(region) + base_name = region_to_name[region][:region_in_name] if region_in_name != -1 else region_to_name[region] + coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions) - 1)) + # And skip these regions + coalesced_away.update(to_coalesce) + + if len(coalesced_away) > 0: + # Drop the coalesced regions from regions + remaining_regions = [] + remaining_names = [] + for i in range(len(regions)): + if regions[i] not in coalesced_away: + remaining_regions.append(regions[i]) + remaining_names.append(region_names[i]) + # And replace the original regions with the coalesced ones, and the + # remaining uncoalesced regions. Put the remaining ones first because + # they are probably big chromosomes and may have associated VCFs. + regions = remaining_regions + coalesced_regions + region_names = remaining_names + coalesced_names + + return (regions, region_names) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index a56b84bd..1a8c5c8c 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -755,7 +755,7 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, # Find the joined VG files, which always exist joined_vg_ids = construct_job.rv('joined') # And give them names - joined_vg_names = [remove_ext(i, '.vg') + '.vg' for i in region_names] + joined_vg_names = construct_job.rv('joined_names') if merge_graphs or not regions or len(regions) < 2: # Sometimes we will have a single VG file also @@ -832,16 +832,20 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, wanted = set('xg') construct_job.addFollowOnJobFn(run_indexing, context, joined_vg_ids, joined_vg_names, output_name_base, chroms, [], [], - wanted=wanted) + wanted=wanted, coalesce_regions=coalesce_regions) - index_prev_job = join_job # In the indexing step below, we want to index our haplo-extracted sample graph # So replace the withref graph IDs and names with these # Find the joined VG files, which always exist joined_vg_ids = join_job.rv('joined') # And give them names - joined_vg_names = [n.replace('_withref', '') for n in joined_vg_names] + rename_job = join_job.addFollowOnJobFn(run_remove_withref, context, joined_vg_names, + cores=context.config.misc_cores, + memory=context.config.misc_mem, + disk=context.config.misc_disk) + joined_vg_names = rename_job.rv() + index_prev_job = rename_job if sample_merge_output_name: # We expect a single output graph too @@ -890,11 +894,19 @@ def run_construct_all(job, context, fasta_ids, fasta_names, vcf_inputs, wanted=wanted, gbwt_prune=gbwt_prune and 'gbwt' in wanted, gbwt_regions=gbwt_regions, - dont_restore_paths=alt_regions) + dont_restore_paths=alt_regions, + coalesce_regions=coalesce_regions) indexes = indexing_job.rv() output.append((joined_vg_ids, joined_vg_names, indexes)) return output + +def run_remove_withref(job, context, joined_vg_names): + """ + Return the names in joined_vg_names with '_withref' removed from them. + """ + + return [n.replace('_withref', '') for n in joined_vg_names] def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vcf_names, tbi_ids, @@ -941,46 +953,7 @@ def run_construct_genome_graph(job, context, fasta_ids, fasta_names, vcf_ids, vc if not alt_regions_id: # Coalesce regions (which we can't yet do if also running MSGA) - wanted_regions = set(regions) - # We need to fake the output names for the coalesced regions based on - # the original ones. So map from region to region name. - region_to_name = dict(zip(regions, region_names)) - # These will replace regions and region_names if we coalesce away regions - coalesced_regions = [] - coalesced_names = [] - coalesced_away = set() - - for to_coalesce in coalesce_regions: - # Find out if we have all the regions that need to be coalesced here. - have_all = True - for region in to_coalesce: - if region not in wanted_regions: - have_all = False - break - if have_all: - # Use this coalescing - coalesced_regions.append(to_coalesce) - - # Try and replace the region in its name, if possible, when naming the coalesced region. - region_in_name = region_to_name[region].rfind(region) - base_name = region_to_name[region][:region_in_name] if region_in_name != -1 else region_to_name[region] - coalesced_names.append("{}coalesced{}".format(base_name, len(coalesced_regions) - 1)) - # And skip these regions - coalesced_away.update(to_coalesce) - - if len(coalesced_away) > 0: - # Drop the coalesced regions from regions - remaining_regions = [] - remaining_names = [] - for i in range(len(regions)): - if regions[i] not in coalesced_away: - remaining_regions.append(regions[i]) - remaining_names.append(region_names[i]) - # And replace the original regions with the coalesced ones, and the - # remaining uncoalesced regions. Put the remaining ones first because - # they are probably big chromosomes and may have associated VCFs. - regions = remaining_regions + coalesced_regions - region_names = remaining_names + coalesced_names + regions, region_names = apply_coalesce(regions, region_names=region_names, coalesce_regions=coalesce_regions) region_graph_ids = [] for i, (region, region_name) in enumerate(zip(regions, region_names)): @@ -1054,6 +1027,8 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name 'joined': a list of the unmerged, id-joined graph file IDs (or the input graph(s) re-uploaded as output files if no joining occurred) + 'joined_names': a list of .vg filenames for those graphs + 'merged': the merged graph file ID, if merging occurred, or the only input graph ID, if there was only one. None otherwise. @@ -1082,6 +1057,7 @@ def run_join_graphs(job, context, region_graph_ids, join_ids, region_names, name # set to make asking for things with .rv() easier. to_return = { 'joined': [], + 'joined_names': human_names, 'merged': None, 'mapping': None } diff --git a/src/toil_vg/vg_index.py b/src/toil_vg/vg_index.py index 9c396676..952d9900 100755 --- a/src/toil_vg/vg_index.py +++ b/src/toil_vg/vg_index.py @@ -224,7 +224,7 @@ def run_gcsa_prune(job, context, graph_name, input_graph_id, gbwt_id, mapping_id return pruned_graph_id, mapping_id def run_gcsa_prep(job, context, input_graph_ids, - graph_names, index_name, chroms, + graph_names, index_name, chrom_gbwt_ids, node_mapping_id, skip_pruning=False, remove_paths=[]): """ @@ -825,7 +825,9 @@ def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name= def run_id_ranges(job, context, inputGraphFileIDs, graph_names, index_name, chroms): """ Make a file of chrom_name first_id last_id covering the - id ranges of all chromosomes. This is to speed up gam splitting down the road. + id ranges of all chromosomes. This is to speed up gam splitting down the road. + + Chroms is a list of contig names or sets of contig names that correspond to each graph. """ RealtimeLogger.info("Starting id ranges...") @@ -859,6 +861,10 @@ def run_id_range(job, context, graph_id, graph_name, chrom): """ Compute a node id range for a graph (which should be an entire contig/chromosome with contiguous id space -- see vg ids) using vg stats + + Chrom is a contig name or set of contig names we expect to be in the graph. + + If multiple contigs are in the graph, we comma-separate them. """ work_dir = job.fileStore.getLocalTempDir() @@ -872,7 +878,10 @@ def run_id_range(job, context, graph_id, graph_name, chrom): stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).strip().split() assert stats_out[0].decode('ascii') == 'node-id-range' first, last = stats_out[1].split(b':') - + + if isinstance(chrom, set): + chrom = ','.join(sorted(chrom)) + return chrom, first, last def run_merge_id_ranges(job, context, id_ranges, index_name): @@ -1113,7 +1122,8 @@ def run_indexing(job, context, inputGraphFileIDs, gbwt_id = None, node_mapping_id = None, wanted = set(), gbwt_prune=False, gbwt_regions=[], - dont_restore_paths=[]): + dont_restore_paths=[], + coalesce_regions=[]): """ Run indexing logic by itself. @@ -1164,9 +1174,18 @@ def run_indexing(job, context, inputGraphFileIDs, skipped. If the 'distance' index is requested, the 'trivial_snarls' and 'xg' indexes - will also be computed. + will also be computed. + + If coalesce_regions is set, it must be a list of sets of 'chroms' region + names. Each set of region names will be expected to be together in a graph + file, instead of in separate graph files. """ + + # Coalesce the chroms, so we have some sets of chroms that live in the same + # graph file. + chroms, chrom_names = apply_coalesce(chroms, coalesce_regions=coalesce_regions) + # Make a master child job child_job = Job() job.addChild(child_job) @@ -1182,14 +1201,15 @@ def run_indexing(job, context, inputGraphFileIDs, RealtimeLogger.info("Running indexing: {}.".format({ 'graph_names': graph_names, 'index_name': index_name, - 'chroms': chroms, + 'chroms': chroms if len(chroms) < 100 else f'{len(chroms)} items', 'vcf_phasing_file_ids': vcf_phasing_file_ids, 'tbi_phasing_file_ids': tbi_phasing_file_ids, 'gbwt_id': gbwt_id, 'node_mapping_id': node_mapping_id, 'wanted': wanted, 'gbwt_prune': gbwt_prune, - 'bwa_fasta_id': bwa_fasta_id + 'bwa_fasta_id': bwa_fasta_id, + 'coalesce_regions': coalesce_regions if max([len(x) for x in coalesce_regions] + [0]) < 100 else '(many)' })) # This will hold the index to return @@ -1281,7 +1301,7 @@ def run_indexing(job, context, inputGraphFileIDs, # don't put them in the outstore, unless we're only doing one contig. xg_chrom_index_job = chrom_xg_root_job.addChildJobFn(run_cat_xg_indexing, context, [inputGraphFileIDs[i]], - [graph_names[i]], chrom, + [graph_names[i]], chrom_names[i], vcf_id, tbi_id, make_gbwt=('gbwt' in wanted), gbwt_regions=gbwt_regions, intermediate=(len(chroms) > 1), @@ -1340,7 +1360,7 @@ def run_indexing(job, context, inputGraphFileIDs, # We lack per-chromosome GBWTs but we have a whole genome one we can use indexes['chrom_gbwt'] = indexes['gbwt'] * len(inputGraphFileIDs) gcsa_job = gcsa_root_job.addChildJobFn(run_gcsa_prep, context, inputGraphFileIDs, - graph_names, index_name, chroms, + graph_names, index_name, indexes.get('chrom_gbwt', []) if gbwt_prune else [], node_mapping_id, remove_paths=dont_restore_paths, From 9411d73c09b17440e5a90ed43a79a3ec4db0b354 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 19 Mar 2021 18:53:27 -0700 Subject: [PATCH 11/14] Handle Docker dict return --- src/toil_vg/vg_common.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/toil_vg/vg_common.py b/src/toil_vg/vg_common.py index 1881d40d..38bbc810 100644 --- a/src/toil_vg/vg_common.py +++ b/src/toil_vg/vg_common.py @@ -524,6 +524,10 @@ def call_with_docker(self, job, args, work_dir, outfile, errfile, check_output, # When we get here, the container has been run, and stdout is either in the file object we sent it to or in the Docker logs. # stderr is always in the Docker logs. + + if isinstance(return_code, dict) and 'StatusCode' in return_code: + # New? Docker gives us a dict like this + return_code = return_code['StatusCode'] if return_code != 0: # What were we doing? From fb5f7acec1b76641eeb392254d450bcf9ee19be4 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 22 Mar 2021 11:47:49 -0700 Subject: [PATCH 12/14] Adapt sim validation to new validate format --- src/toil_vg/vg_sim.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/toil_vg/vg_sim.py b/src/toil_vg/vg_sim.py index 92e3ee12..63cca8c8 100644 --- a/src/toil_vg/vg_sim.py +++ b/src/toil_vg/vg_sim.py @@ -280,8 +280,9 @@ def run_sim_chunk(job, context, gam, seed_base, xg_file_id, xg_annot_file_id, nu try: context.runner.call(job, cmd, work_dir = work_dir, outfile=output_json) if validate: - context.runner.call(job, ['vg', 'validate', '--xg', os.path.basename(xg_file), - '--gam', os.path.basename(gam_file)], work_dir = work_dir) + context.runner.call(job, ['vg', 'validate', + '--gam', os.path.basename(gam_file), + os.path.basename(xg_file)], work_dir = work_dir) except: # Dump everything we need to replicate the problem context.write_output_file(job, xg_file) From b3c290f7143376978e35e98ed524ca613941de01 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 22 Mar 2021 12:01:12 -0700 Subject: [PATCH 13/14] Tolerate non-graph path protobufs --- src/toil_vg/vg_construct.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil_vg/vg_construct.py b/src/toil_vg/vg_construct.py index 1a8c5c8c..62f042ad 100644 --- a/src/toil_vg/vg_construct.py +++ b/src/toil_vg/vg_construct.py @@ -1551,7 +1551,7 @@ def run_make_haplo_thread_graphs(job, context, vg_id, vg_name, output_name, chro # Now combine the two files, adding the paths to the graph vg_with_thread_as_path_path = os.path.join(work_dir, '{}{}_thread_{}_merge.vg'.format(output_name, tag, hap)) logger.info('Creating thread graph {}'.format(vg_with_thread_as_path_path)) - cmd = ['vg', 'combine', base_graph_filename, path_graph_filename] + cmd = ['vg', 'combine', '-c', base_graph_filename, path_graph_filename] with open(vg_with_thread_as_path_path, 'wb') as out_file: context.runner.call(job, cmd, work_dir = work_dir, outfile = out_file) From 4fee20e4b28829fb109bb965499e621c0de6cf0a Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 27 Aug 2020 13:16:54 -0700 Subject: [PATCH 14/14] Pass ID range components around as strings This may fix #824. --- src/toil_vg/vg_index.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/toil_vg/vg_index.py b/src/toil_vg/vg_index.py index 952d9900..bb9aefa5 100755 --- a/src/toil_vg/vg_index.py +++ b/src/toil_vg/vg_index.py @@ -865,6 +865,8 @@ def run_id_range(job, context, graph_id, graph_name, chrom): Chrom is a contig name or set of contig names we expect to be in the graph. If multiple contigs are in the graph, we comma-separate them. + + Returns a tuple of 3 strings: chromosome name, first base number, last base number. """ work_dir = job.fileStore.getLocalTempDir() @@ -875,8 +877,8 @@ def run_id_range(job, context, graph_id, graph_name, chrom): #run vg stats #expect result of form node-id-range first:last command = ['vg', 'stats', '--node-id-range', os.path.basename(graph_filename)] - stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).strip().split() - assert stats_out[0].decode('ascii') == 'node-id-range' + stats_out = context.runner.call(job, command, work_dir=work_dir, check_output = True).decode('utf-8').strip().split() + assert stats_out[0] == 'node-id-range' first, last = stats_out[1].split(b':') if isinstance(chrom, set): @@ -892,9 +894,9 @@ def run_merge_id_ranges(job, context, id_ranges, index_name): # Where do we put the id ranges tsv? id_range_filename = os.path.join(work_dir, '{}_id_ranges.tsv'.format(index_name)) - with open(id_range_filename, 'wb') as f: + with open(id_range_filename, 'w') as f: for id_range in id_ranges: - f.write('{}\t{}\t{}\n'.format(*id_range).encode()) + f.write('{}\t{}\t{}\n'.format(*id_range)) # Checkpoint index to output store return context.write_output_file(job, id_range_filename)