Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 71 additions & 15 deletions simple_video_utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class VideoMetadata(NamedTuple):
width: int
height: int
fps: float
nb_frames: int | None # from the header or counted by demuxing; estimated for non-seekable input
nb_frames: int | None # best effort: header, cross-checked against duration×fps, decoded on disagreement
time_base: str | None
duration: float | None # seconds; None if the container header doesn't carry one
rotation: int = 0 # display-matrix rotation in degrees; width/height already account for it
Expand All @@ -36,10 +36,11 @@ def _open_container(source: str | io.BytesIO):

def _count_video_packets(container: av.container.InputContainer) -> int | None:
"""
Count video frames exactly by demuxing packets (no decoding), then rewind.
Count video packets by demuxing (no decoding), then rewind.

Video codecs carry one frame per packet, so the packet count is the frame
count. Requires a seekable container; returns None if demuxing fails.
Video codecs carry one frame per packet, so this approximates the frame
count — but buggy files can carry trailing packets that never decode.
Requires a seekable container; returns None if demuxing fails.
"""
try:
return sum(1 for packet in container.demux(video=0) if packet.size)
Expand All @@ -49,6 +50,54 @@ def _count_video_packets(container: av.container.InputContainer) -> int | None:
container.seek(0)


def _count_decoded_frames(container: av.container.InputContainer) -> int | None:
"""
Ground-truth frame count by decoding the whole video stream, then rewind.

Slow — O(stream duration) — so only used when cheaper signals disagree.
Requires a seekable container; returns None if decoding fails.
"""
try:
return sum(1 for _ in container.decode(video=0))
except (av.FFmpegError, OSError):
return None
finally:
container.seek(0)


def _best_effort_nb_frames(
container: av.container.InputContainer,
stream: av.VideoStream,
fps: float,
duration: float | None,
seekable: bool,
) -> int | None:
"""
Do our best to report the true frame count (see issue #4).

Container headers can lie: some MOV/MP4 files declare more frames than
actually decode, and Matroska/WebM headers often omit the count entirely.
Cross-check the cheap candidate (header, else packet count) against
duration × fps; on agreement trust it, on disagreement decode for the
ground truth. Non-seekable input can't rewind, so it gets the cheap
signals only.
"""
header = stream.frames if stream.frames > 0 else None
derived = round(duration * fps) if duration and fps else None

if not seekable:
return header if header is not None else derived

candidate = header if header is not None else _count_video_packets(container)
if candidate is None:
return derived
if derived is None or abs(candidate - derived) <= 1:
return candidate

decoded = _count_decoded_frames(container)
return decoded if decoded is not None else candidate


def _probe_rotation(container: av.container.InputContainer) -> int:
"""
Read the display-matrix rotation by decoding the first frame, then rewind.
Expand Down Expand Up @@ -99,17 +148,8 @@ def video_metadata_from_container(
else:
duration = None

if stream.frames > 0:
nb_frames = stream.frames
else:
# Matroska/WebM headers often omit the frame count. When the container
# is seekable (rotation is None — same contract as rotation probing),
# demux the packets for an exact count. Otherwise estimate; the
# container duration can overshoot the video stream, so the estimate
# may run a frame high.
nb_frames = _count_video_packets(container) if rotation is None else None
if nb_frames is None and fps and duration:
nb_frames = round(duration * fps)
# rotation is None ⇒ the container is seekable (same contract as rotation probing)
nb_frames = _best_effort_nb_frames(container, stream, fps, duration, seekable=rotation is None)

if rotation is None:
rotation = _probe_rotation(container)
Expand All @@ -132,6 +172,22 @@ def video_metadata_from_container(



def count_frames(source: str | io.BytesIO) -> int:
"""
Ground-truth frame count by decoding the entire video stream.

Slow — O(stream duration). ``video_metadata(...).nb_frames`` is the
best-effort answer and usually matches; use this when you need certainty
regardless of what the container header claims.
"""
with _open_container(source) as container:
count = _count_decoded_frames(container)
if count is None:
msg = "Failed to decode video"
raise RuntimeError(msg)
return count


def video_metadata_from_bytes(data: bytes) -> VideoMetadata:
"""Return key video stream metadata from video bytes."""
with _open_container(io.BytesIO(data)) as container:
Expand Down
Binary file added tests/assets/buggy_mov_header.mov
Binary file not shown.
19 changes: 12 additions & 7 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import pytest

from simple_video_utils.frames import read_frames_exact
from simple_video_utils.metadata import video_metadata, video_metadata_from_bytes
from simple_video_utils.metadata import count_frames, video_metadata, video_metadata_from_bytes


class TestVideoMetadata:
Expand Down Expand Up @@ -78,14 +77,20 @@ def test_webm_file(self):
assert meta.duration is not None
assert meta.duration > 0

@pytest.mark.parametrize("filename", ["no_nb_frames.mkv", "no_nb_frames.webm"])
def test_nb_frames_exact_when_header_missing(self, filename):
"""Matroska/WebM without nb_frames in the header: counted exactly by demuxing."""
@pytest.mark.parametrize(
"filename",
[
"no_nb_frames.mkv", # Matroska without nb_frames in the header
"no_nb_frames.webm", # WebM without nb_frames in the header
"buggy_mov_header.mov", # header declares more frames than actually decode
],
)
def test_nb_frames_matches_decoded_frames(self, filename):
"""nb_frames must match the true decoded count even when the header is absent or lies."""
video = str(Path(__file__).parent / "assets" / filename)

meta = video_metadata(video)
decoded_frames = list(read_frames_exact(video))
assert meta.nb_frames == len(decoded_frames)
assert meta.nb_frames == count_frames(video)

def test_invalid_utf8_metadata_video(self):
"""Test a video whose stray data streams carry non-UTF-8 handler_name metadata."""
Expand Down