Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,265 changes: 734 additions & 531 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ clap = { version = "4.5.27", features = [ "derive" ] }
color-eyre = "0.6.2"
csv = "1.3.0"
needletail = "0.6.0"
niffler = { version = "2.7.0", default-features = false, features = [ "gz" ]}
niffler = { version = "3.0.0", default-features = false, features = [ "gz" ]}
numsep = "0.1.12"
rayon = "1.10.0"
reqwest = { version = "0.12.9", default-features = false, features = [ "json", "rustls-tls" ] }
reqwest-retry = "0.7.0"
reqwest-middleware = { version = "0.4.0", features = [ "json" ] }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"
size = "0.4.0"
sourmash = { version = "0.18.0" }
#sourmash = { version = "0.18.0" }
sourmash = { git = "https://github.com/sourmash-bio/sourmash", branch = "lirber/record_selection" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] }

Expand Down
3 changes: 3 additions & 0 deletions crates/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ repository = "https://github.com/sourmash-bio/branchwater"
[dependencies]
camino.workspace = true
clap.workspace = true
color-eyre.workspace = true
csv.workspace = true
numsep.workspace = true
rayon.workspace = true
size.workspace = true
sourmash = { workspace = true, features = ["branchwater"] }
tracing.workspace = true
Expand Down
119 changes: 71 additions & 48 deletions crates/index/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use camino::Utf8Path as Path;
use camino::Utf8PathBuf as PathBuf;
use clap::{Parser, Subcommand};
use tracing::info;
use color_eyre::eyre::Result;
use rayon::prelude::*;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use sourmash::collection::Collection;
use sourmash::index::revindex::{prepare_query, RevIndex, RevIndexOps};
use sourmash::manifest::Manifest;
use sourmash::manifest::{Manifest, Record};
use sourmash::prelude::*;
use sourmash::signature::{Signature, SigsTrait};
use sourmash::storage::{FSStorage, InnerStorage, ZipStorage};
Expand Down Expand Up @@ -193,18 +195,11 @@ fn gather<P: AsRef<Path>>(
info!("Loaded DB");

info!("Building counter");
let (counter, query_colors, hash_to_color) = db.prepare_gather_counters(&query);
let counter = db.prepare_gather_counters(&query, None);
// TODO: truncate on threshold?
info!("Counter built");

let matches = db.gather(
counter,
query_colors,
hash_to_color,
threshold,
&query,
Some(selection),
)?;
let matches = db.gather(counter, threshold, &query, Some(selection))?;

info!("matches: {}", matches.len());
for match_ in matches {
Expand Down Expand Up @@ -244,7 +239,7 @@ fn search<P: AsRef<Path>>(
info!("Loaded DB");

info!("Building counter");
let counter = db.counter_for_query(&query);
let counter = db.counter_for_query(&query, None);
info!("Counter built");

let matches = db.matches_from_counter(counter, threshold);
Expand Down Expand Up @@ -293,7 +288,7 @@ fn index<P: AsRef<Path>>(
Collection::from_zipfile(location)?
}
} else {
let manifest = manifest.ok_or_else(|| "Need a manifest")?;
let manifest = manifest.ok_or("Need a manifest")?;
assert!(location.as_ref().exists());
assert!(location.as_ref().is_dir());
let storage = FSStorage::builder()
Expand All @@ -303,11 +298,7 @@ fn index<P: AsRef<Path>>(
Collection::new(manifest, InnerStorage::new(storage))
};

RevIndex::create(
output.as_ref(),
collection.select(&selection)?.try_into()?,
colors,
)?;
RevIndex::create(output.as_ref(), collection.select(&selection)?.try_into()?)?;

Ok(())
}
Expand Down Expand Up @@ -361,7 +352,7 @@ fn update<P: AsRef<Path>>(
Collection::from_zipfile(location)?
}
} else {
let manifest = manifest.ok_or_else(|| "Need a manifest")?;
let manifest = manifest.ok_or("Need a manifest")?;
assert!(location.as_ref().exists());
assert!(location.as_ref().is_dir());
let storage = FSStorage::builder()
Expand Down Expand Up @@ -399,7 +390,7 @@ fn manifest<P: AsRef<Path>>(
output: Option<P>,
selection: Option<Selection>,
basepath: Option<P>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<()> {
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

Expand All @@ -412,40 +403,72 @@ fn manifest<P: AsRef<Path>>(
})
.collect();

let manifest: Manifest = paths.as_slice().into();

let manifest = if let Some(selection) = selection {
manifest.select(&selection)?
} else {
manifest
};

let manifest = if let Some(basepath) = basepath {
let path: &str = basepath.as_ref().as_str();
manifest
.iter()
.map(|r| {
let mut record = r.clone();
record.set_internal_location(
r.internal_location()
.strip_prefix(path)
.expect("Error stripping")
.into(),
);
record
})
.collect::<Vec<_>>()
.into()
} else {
manifest
};
let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads());

// Spawn a thread that is dedicated to printing to a buffered output
let out: Box<dyn Write + Send> = match output {
Some(path) => Box::new(BufWriter::new(File::create(path.as_ref()).unwrap())),
None => Box::new(std::io::stdout()),
};
let thrd = std::thread::spawn(move || {
let mut wtr = BufWriter::new(out);
wtr.write_all(b"# SOURMASH-MANIFEST-VERSION: 1.0\n")
.unwrap();

let mut wtr = csv::Writer::from_writer(wtr);

manifest.to_writer(out)?;
for record in recv.into_iter() {
wtr.serialize(record).unwrap();
wtr.flush().unwrap();
}
});
let basepath: Option<PathBuf> = basepath.map(|p| p.as_ref().into());

let send: Result<()> = paths.into_par_iter().try_for_each_with(send, |s, ref p| {
let sig = match Signature::from_path(p) {
Ok(s) => s,
Err(_) => {
error!("Error processing {:?}", p);
return Ok(());
}
};
sig.into_iter().try_for_each(|v| {
Record::from_sig(&v, p.as_str())
.into_iter()
.try_for_each(|mut r| {
if let Some(ref basepath) = basepath {
r.set_internal_location(
r.internal_location()
.strip_prefix(basepath.as_str())
.expect("Error stripping")
.into(),
);
};

if let Some(ref selection) = selection {
if let Ok(r) = r.select(selection) {
// we have a valid record, send it to output
s.send(r)?;
}
} else {
// no selection needed, just send the record to output
s.send(r)?;
};

Ok::<(), color_eyre::eyre::Error>(())
})
})?;

Ok(())
});

if let Err(e) = send {
error!("Unable to send internal data: {:?}", e);
}

if let Err(e) = thrd.join() {
error!("Unable to join internal thread: {:?}", e);
}

Ok(())
}
Expand Down
9 changes: 7 additions & 2 deletions crates/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl AppState {

let Ok((matches, query_size)) = tokio::task::spawn_blocking(move || {
if let Some(mh) = prepare_query(query, &selection) {
let counter = db.counter_for_query(&mh);
let counter = db.counter_for_query(&mh, None);
let matches = db.matches_from_counter(counter, threshold);
Ok((matches, mh.size() as f64))
} else {
Expand All @@ -203,7 +203,12 @@ impl AppState {
let containment = size as f64 / query_size;
format!(
"{},{}",
path.split('/').last().unwrap().split('.').next().unwrap(),
path.split('/')
.next_back()
.unwrap()
.split('.')
.next()
.unwrap(),
containment
)
}));
Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
overlays = [ (import rust-overlay) ];
};

inherit (pkgs) lib;
inherit (pkgs) lib stdenv;

rustOxalica = pkgs.rust-bin.stable.latest.default.override {
#targets = [ "wasm32-wasi" ];
Expand All @@ -39,20 +39,15 @@
# our specific toolchain there.
craneLib = (crane.mkLib pkgs).overrideToolchain rustOxalica;

stdenv = if pkgs.stdenv.isDarwin then pkgs.overrideSDK pkgs.stdenv "11.0" else pkgs.stdenv;

commonArgs = {
src = ./.;
stdenv = stdenv;
preConfigure = lib.optionalString stdenv.isDarwin ''
export MACOSX_DEPLOYMENT_TARGET=10.14
'';

buildInputs = with pkgs; [
llvmPackages_16.libclang
llvmPackages_16.libcxxClang
] ++ lib.optionals stdenv.isDarwin [
darwin.apple_sdk.frameworks.Security
];

# Extra inputs can be added here
Expand Down Expand Up @@ -155,7 +150,7 @@
branchwaterNextest;
};

devShells.default = pkgs.mkShell.override { stdenv = stdenv; } (commonArgs // {
devShells.default = pkgs.mkShell.override { } (commonArgs // {
inputsFrom = builtins.attrValues self.checks;

buildInputs = with pkgs; [
Expand Down Expand Up @@ -190,8 +185,6 @@
sourmash
tox
]))
] ++ lib.optionals stdenv.isDarwin [
darwin.apple_sdk.frameworks.Security
];
shellHook = ''
export MACOSX_DEPLOYMENT_TARGET=10.14
Expand Down
Loading
Loading