Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
]

[project.optional-dependencies]
hdf5 = ["h5py"]
s3 = ["amplify-storage-utils[s3] @ git+https://github.com/WHOIGit/amplify-storage-utils.git@v1.7.0"]
dev = ["pytest", "pytest-asyncio"]

Expand Down
4 changes: 4 additions & 0 deletions src/ifcbkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@
async_blob_path,
async_class_scores_path,
async_features_path,
read_blobs,
read_class_scores,
read_features,
sync_find_product_file,
sync_list_product_files,
sync_blob_path,
sync_class_scores_path,
sync_features_path,
ClassScoresRows,
)

# Store base classes (always available)
Expand Down
149 changes: 126 additions & 23 deletions src/ifcbkit/products.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,58 @@
"""
Product file discovery for IFCB derived data (blobs, features, class scores).
Product file discovery and parsing for IFCB derived data.

Helpers for locating derived product files alongside raw data:
- find_product_file: recursive search
- list_product_files: pattern-matched listing
- Convenience: blob_path, class_scores_path, features_path
- Readers for blobs, features, and v3 class scores

Both sync and async APIs are provided.
Both sync and async discovery APIs are provided. Readers are sync-only.
"""

import csv
import os
import re
from dataclasses import dataclass
from pathlib import Path
from zipfile import ZipFile

import aiofiles.os as aios
import aiofiles.ospath as aiopath


# --- Async API ---
@dataclass(slots=True)
class ClassScoresRows:
"""Class score rows for one bin."""

class_names: list[str]
rows: list[tuple[int, dict[str, float]]]


def _parse_scalar(value: str) -> int | float:
try:
return int(value)
except ValueError:
return float(value)


def _require_fieldnames(fieldnames: list[str] | None, context: str) -> list[str]:
if not fieldnames:
raise ValueError(f"{context} is missing a header row.")
return fieldnames


def _resolve_roi_field(fieldnames: list[str], context: str) -> str:
for candidate in ("roi_number", "roiNumber"):
if candidate in fieldnames:
return candidate
raise ValueError(
f"{context} is missing an ROI number column. Expected one of: "
f"roi_number, roiNumber."
)


# --- Async discovery API ---

async def async_find_product_file(directory, filename, exhaustive=False):
"""Recursively search for a product file by name.
Expand Down Expand Up @@ -74,32 +110,32 @@ async def async_product_path(directory, filename, exhaustive=False):
"""Find a product file or raise FileNotFoundError."""
path = await async_find_product_file(directory, filename, exhaustive=exhaustive)
if not path:
raise FileNotFoundError(f'Product file {filename} not found in {directory}')
raise FileNotFoundError(f"Product file {filename} not found in {directory}")
return path


async def async_blob_path(directory, pid, version=4):
"""Find the blob ZIP file for a given PID."""
filename = f'{pid}_blobs_v{version}.zip'
filename = f"{pid}_blobs_v{version}.zip"
return await async_product_path(directory, filename)


async def async_class_scores_path(directory, pid, version=4):
"""Find the class scores CSV file for a given PID."""
filename = f'{pid}.csv'
async def async_class_scores_path(directory, pid):
"""Find the v3 class scores HDF5 file for a given PID."""
filename = f"{pid}_class.h5"
return await async_product_path(directory, filename)


async def async_features_path(directory, pid, version=4):
"""Find the features ZIP file for a given PID."""
filename = f'{pid}_features_v{version}.zip'
"""Find the features CSV file for a given PID."""
filename = f"{pid}_fea_v{version}.csv"
return await async_product_path(directory, filename)


# --- Sync API ---
# --- Sync discovery API ---

def sync_find_product_file(directory, filename, exhaustive=False):
"""Recursively search for a product file by name (synchronous).
"""Recursively search for a product file by name.

:param directory: root directory to search
:param filename: the filename to find
Expand Down Expand Up @@ -131,7 +167,7 @@ def sync_find_product_file(directory, filename, exhaustive=False):


def sync_list_product_files(directory, regex):
"""Generator yielding paths to product files matching a regex (synchronous).
"""Generator yielding paths to product files matching a regex.

:param directory: root directory to search
:param regex: regex pattern to match filenames against
Expand All @@ -144,32 +180,99 @@ def sync_list_product_files(directory, regex):
for name in names:
path = os.path.join(directory, name)
if os.path.isdir(path):
yield from sync_list_product_files(path, regex)
yield from sync_list_product_files(directory=path, regex=regex)
elif re.match(regex, name):
yield path


def sync_product_path(directory, filename, exhaustive=False):
"""Find a product file or raise FileNotFoundError (synchronous)."""
"""Find a product file or raise FileNotFoundError."""
path = sync_find_product_file(directory, filename, exhaustive=exhaustive)
if not path:
raise FileNotFoundError(f'Product file {filename} not found in {directory}')
raise FileNotFoundError(f"Product file {filename} not found in {directory}")
return path


def sync_blob_path(directory, pid, version=4):
"""Find the blob ZIP file for a given PID (synchronous)."""
filename = f'{pid}_blobs_v{version}.zip'
"""Find the blob ZIP file for a given PID."""
filename = f"{pid}_blobs_v{version}.zip"
return sync_product_path(directory, filename)


def sync_class_scores_path(directory, pid, version=4):
"""Find the class scores CSV file for a given PID (synchronous)."""
filename = f'{pid}.csv'
def sync_class_scores_path(directory, pid):
"""Find the v3 class scores HDF5 file for a given PID."""
filename = f"{pid}_class.h5"
return sync_product_path(directory, filename)


def sync_features_path(directory, pid, version=4):
"""Find the features ZIP file for a given PID (synchronous)."""
filename = f'{pid}_features_v{version}.zip'
"""Find the features CSV file for a given PID."""
filename = f"{pid}_fea_v{version}.csv"
return sync_product_path(directory, filename)


# --- Parsing API ---

def read_blobs(path: str | os.PathLike[str]):
"""Yield ``(roi_id, png_bytes)`` for every blob in the archive."""
with ZipFile(path) as zf:
for name in zf.namelist():
if not name.lower().endswith(".png"):
continue
roi_id = Path(name).name.removesuffix(".png")
yield roi_id, zf.read(name)


def read_features(
path: str | os.PathLike[str],
) -> list[tuple[int, dict[str, int | float]]]:
"""Read IFCB features CSV rows as ``(roi_number, feature_dict)`` pairs."""
rows: list[tuple[int, dict[str, int | float]]] = []
with open(path, newline="") as handle:
reader = csv.DictReader(handle)
fieldnames = _require_fieldnames(reader.fieldnames, f"Features file {path}")
roi_field = _resolve_roi_field(fieldnames, f"Features file {path}")

for row in reader:
roi_number_raw = row.get(roi_field)
if roi_number_raw in (None, ""):
raise ValueError(f"Features file {path} contains a row with no ROI number.")

values: dict[str, int | float] = {}
for key, value in row.items():
if key == roi_field or value in (None, ""):
continue
values[key] = _parse_scalar(value)
rows.append((int(roi_number_raw), values))
return rows


def read_class_scores(
path: str | os.PathLike[str],
) -> ClassScoresRows:
"""Read a v3 IFCB class scores HDF5 file."""
try:
import h5py
except ImportError as exc:
raise ImportError(
"read_class_scores requires h5py; install with `pip install ifcbkit[hdf5]`"
) from exc

with h5py.File(path, "r") as handle:
scores = handle["output_scores"][:]
class_names = [label.decode("ascii") for label in handle["class_labels"][:]]
roi_numbers = handle["roi_numbers"][:]

rows: list[tuple[int, dict[str, float]]] = []
for roi_number, score_row in zip(roi_numbers, scores, strict=True):
rows.append(
(
int(roi_number),
{
class_name: float(score)
for class_name, score in zip(class_names, score_row, strict=True)
},
)
)

return ClassScoresRows(class_names=class_names, rows=rows)
102 changes: 102 additions & 0 deletions tests/test_products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from zipfile import ZipFile

import pytest

from ifcbkit import (
ClassScoresRows,
read_blobs,
read_class_scores,
read_features,
sync_blob_path,
sync_class_scores_path,
sync_features_path,
)


def test_sync_features_path_uses_real_filename(tmp_path):
path = tmp_path / "D20250101T020023_IFCB010_fea_v4.csv"
path.write_text("roi_number,Area\n1,2\n")

found = sync_features_path(tmp_path, "D20250101T020023_IFCB010")

assert found == str(path)


def test_sync_class_scores_path_uses_real_filename(tmp_path):
path = tmp_path / "D20250101T020023_IFCB010_class.h5"
path.write_bytes(b"")

found = sync_class_scores_path(tmp_path, "D20250101T020023_IFCB010")

assert found == str(path)


def test_sync_blob_path_uses_real_filename(tmp_path):
path = tmp_path / "D20250101T020023_IFCB010_blobs_v4.zip"
path.write_bytes(b"")

found = sync_blob_path(tmp_path, "D20250101T020023_IFCB010")

assert found == str(path)


def test_read_features_returns_roi_number_and_scalar_dict(tmp_path):
path = tmp_path / "features.csv"
path.write_text(
"roi_number,Area,Count,MajorAxisLength\n"
"2,1.5,3,4.25\n"
"5,2.0,4,6.5\n"
)

rows = read_features(path)

assert rows == [
(2, {"Area": 1.5, "Count": 3, "MajorAxisLength": 4.25}),
(5, {"Area": 2.0, "Count": 4, "MajorAxisLength": 6.5}),
]


def test_read_features_accepts_roi_number_camel_case(tmp_path):
path = tmp_path / "features.csv"
path.write_text(
"roiNumber,Area,Count\n"
"7,1.5,3\n"
)

rows = read_features(path)

assert rows == [(7, {"Area": 1.5, "Count": 3})]


def test_read_blobs_yields_roi_id_and_png_bytes(tmp_path):
path = tmp_path / "blobs.zip"
with ZipFile(path, "w") as zf:
zf.writestr("D20250101T020023_IFCB010_00002.png", b"png-a")
zf.writestr("nested/D20250101T020023_IFCB010_00003.png", b"png-b")
zf.writestr("ignore.txt", b"nope")

rows = list(read_blobs(path))

assert rows == [
("D20250101T020023_IFCB010_00002", b"png-a"),
("D20250101T020023_IFCB010_00003", b"png-b"),
]


def test_read_class_scores_returns_class_names_and_rows(tmp_path):
h5py = pytest.importorskip("h5py")

path = tmp_path / "scores.h5"
with h5py.File(path, "w") as handle:
handle.create_dataset("output_scores", data=[[0.1, 0.9], [0.8, 0.2]])
handle.create_dataset("class_labels", data=[b"detritus", b"diatom"])
handle.create_dataset("roi_numbers", data=[2, 5])

result = read_class_scores(path)

assert isinstance(result, ClassScoresRows)
assert result.class_names == ["detritus", "diatom"]
assert result.rows == [
(2, {"detritus": 0.1, "diatom": 0.9}),
(5, {"detritus": 0.8, "diatom": 0.2}),
]
Loading