Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 68 additions & 26 deletions ddtrace/appsec/_ai_guard/_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,26 @@ def _tool_call_from_block(block: Any) -> ToolCall:
)


def _format_tool_result_block(block: Any) -> list[Message]:
"""Convert a user-side ``tool_result`` block to AI Guard tool message(s).
def _parts_from_content_blocks(blocks: Sequence[Any]) -> list[ContentPart]:
"""Build ContentParts from nested Anthropic content blocks."""
parts: list[ContentPart] = []
for inner in blocks:
inner_type = _get(inner, "type", "") or ""
if inner_type == "text":
text = _get(inner, "text", "") or ""
if text:
parts.append(ContentPart(type="text", text=text))
elif inner_type == "image":
img = _format_image_block(inner)
if img:
parts.append(img)
Comment thread
avara1986 marked this conversation as resolved.
elif inner_type == "document":
parts.extend(_format_document_block(inner))
return parts

Anthropic ``tool_result.content`` is either a string or a list of
``text`` / ``image`` blocks. When images are present, the message uses
a ``list[ContentPart]`` content; otherwise the parts collapse to a plain
string for compactness. ``is_error`` is intentionally not mapped --
AI Guard scans content regardless of error status.
"""

def _format_tool_result_block(block: Any) -> list[Message]:
"""Convert a user-side ``tool_result`` block to AI Guard tool message(s)."""
tool_use_id = _get(block, "tool_use_id", "") or ""
content = _get(block, "content", "")
msg = Message(role="tool", tool_call_id=tool_use_id)
Expand All @@ -200,17 +211,7 @@ def _format_tool_result_block(block: Any) -> list[Message]:
pass

if isinstance(content, list):
parts: list[ContentPart] = []
for inner in content:
inner_type = _get(inner, "type", "") or ""
if inner_type == "text":
text = _get(inner, "text", "") or ""
if text:
parts.append(ContentPart(type="text", text=text))
elif inner_type == "image":
img = _format_image_block(inner)
if img:
parts.append(img)
parts = _parts_from_content_blocks(content)
if parts:
msg["content"] = _reduce_parts(parts)
return [msg]
Expand Down Expand Up @@ -263,16 +264,13 @@ def _format_server_tool_result_block(block: Any) -> Message:
return msg


# AIDEV-NOTE: Anthropic block types that are intentionally NOT mapped to AI
# Guard content -- ``redacted_thinking`` is an encrypted blob, ``document``
# carries binary/PDF data, ``container_upload`` / ``tool_reference`` are bare
# identifiers, and the remaining ``*_tool_result`` variants are server-managed
# wrappers around content AI Guard cannot inspect. Skipping is correct; we
# only log unknown types at debug-level so support can spot integration drift.
# AIDEV-NOTE: Anthropic block types intentionally NOT mapped to AI Guard
# content; unknown types are logged at debug-level. ``document`` is NOT
# dropped (APMSP-3286): it carries model-visible content, see
# _format_document_block.
_DROPPED_BLOCK_TYPES = frozenset(
[
"redacted_thinking",
"document",
"container_upload",
"tool_reference",
"search_result",
Expand All @@ -282,6 +280,47 @@ def _format_server_tool_result_block(block: Any) -> Message:
]
)

# Marker for document content that cannot be read as text (base64/url sources).
_NON_TEXT_DOCUMENT_MARKER = "[non-text document]"


def _format_document_block(block: Any) -> list[ContentPart]:
"""Convert an Anthropic ``document`` block to scannable ContentPart(s).

Always yields at least one part (a marker for non-text sources) so a
document-only message never converts to an empty payload (APMSP-3286).
"""
parts: list[ContentPart] = []

for field in ("title", "context"):
value = _get(block, field, "") or ""
if value:
parts.append(ContentPart(type="text", text=value))

source = _get(block, "source") or {}
source_type = _get(source, "type", "") or ""
if source_type == "text":
data = _get(source, "data", "") or ""
if data:
parts.append(ContentPart(type="text", text=data))
elif source_type == "content":
inner = _get(source, "content", "")
if isinstance(inner, str):
if inner:
parts.append(ContentPart(type="text", text=inner))
else:
if not isinstance(inner, (list, tuple)):
try:
inner = list(inner)
except TypeError:
inner = []
parts.extend(_parts_from_content_blocks(inner or []))
# base64 / url / missing source: not text-readable.

if not parts:
parts.append(ContentPart(type="text", text=_NON_TEXT_DOCUMENT_MARKER))
return parts


def _format_content_blocks(blocks: Sequence[Any]) -> _ParsedBlocks:
"""Walk a list of Anthropic content blocks once and bucket them by role.
Expand Down Expand Up @@ -309,6 +348,9 @@ def _format_content_blocks(blocks: Sequence[Any]) -> _ParsedBlocks:
img_part = _format_image_block(block)
if img_part is not None:
(post_tool_parts if seen_server_tool else pre_tool_parts).append(img_part)
elif block_type == "document":
target = post_tool_parts if seen_server_tool else pre_tool_parts
target.extend(_format_document_block(block))
elif block_type in ("tool_use", "server_tool_use"):
tool_calls.append(_tool_call_from_block(block))
if block_type == "server_tool_use":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
AI Guard: This fix resolves an issue where the Anthropic integration dropped
``document`` content blocks, which could cause evaluation to be skipped for
prompts whose only content was a document. Document text is now scanned and
binary documents leave a placeholder so evaluation still runs.
Loading
Loading