Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions agentrun/conversation_service/__ots_backend_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
)

from agentrun.conversation_service.model import (
CHECKPOINT_BLOBS_SCHEMA_VERSION,
CHECKPOINT_SCHEMA_VERSION,
CHECKPOINT_WRITES_SCHEMA_VERSION,
CONVERSATION_SCHEMA_VERSION,
ConversationEvent,
ConversationSession,
DEFAULT_APP_STATE_TABLE,
Expand All @@ -48,6 +52,9 @@
DEFAULT_STATE_SEARCH_INDEX,
DEFAULT_STATE_TABLE,
DEFAULT_USER_STATE_TABLE,
EVENT_SCHEMA_VERSION,
SCHEMA_VERSION_COLUMN,
STATE_SCHEMA_VERSION,
StateData,
StateScope,
)
Expand Down Expand Up @@ -113,7 +120,7 @@ def __init__(
)

# -----------------------------------------------------------------------
# 建表(异步)/ Table creation (async)
# 建表 / Table creation
# -----------------------------------------------------------------------

async def init_tables_async(self) -> None:
Expand Down Expand Up @@ -174,6 +181,13 @@ async def init_search_index_async(self) -> None:
await self._create_conversation_search_index_async()
await self._create_state_search_index_async()

async def init_conversation_search_index_async(self) -> None:
"""仅创建 Conversation 多元索引(异步)。

索引已存在时跳过,可重复调用。
"""
await self._create_conversation_search_index_async()

async def init_checkpoint_tables_async(self) -> None:
"""创建 LangGraph checkpoint 相关的 3 张表(异步)。

Expand Down Expand Up @@ -595,7 +609,7 @@ async def _create_state_search_index_async(self) -> None:
raise

# -----------------------------------------------------------------------
# Session CRUD(异步)/ Session CRUD (async)
# Session CRUD
# -----------------------------------------------------------------------

async def put_session_async(self, session: ConversationSession) -> None:
Expand All @@ -607,6 +621,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
]

attribute_columns = [
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
("created_at", session.created_at),
("updated_at", session.updated_at),
("is_pinned", session.is_pinned),
Expand Down Expand Up @@ -946,7 +961,7 @@ async def search_sessions_async(
return sessions, search_response.total_count or 0

# -----------------------------------------------------------------------
# Event CRUD(异步)/ Event CRUD (async)
# Event CRUD
# -----------------------------------------------------------------------

async def put_event_async(
Expand Down Expand Up @@ -991,6 +1006,7 @@ async def put_event_async(

content_json = json.dumps(content, ensure_ascii=False)
attribute_columns = [
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
("type", event_type),
("content", content_json),
("created_at", created_at),
Expand Down Expand Up @@ -1171,7 +1187,7 @@ async def delete_events_by_session_async(
return deleted

# -----------------------------------------------------------------------
# State CRUD(JSON 字符串存储 + 列分片)(异步)
# State CRUD(JSON 字符串存储 + 列分片)
# -----------------------------------------------------------------------

async def put_state_async(
Expand Down Expand Up @@ -1204,6 +1220,7 @@ async def put_state_async(
state_json = serialize_state(state)

put_cols: list[tuple[str, Any]] = [
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
("updated_at", now),
("version", version + 1),
]
Expand Down Expand Up @@ -1328,7 +1345,7 @@ async def delete_state_row_async(
await self._async_client.delete_row(table_name, row, condition)

# -----------------------------------------------------------------------
# Checkpoint CRUD(LangGraph)(异步)
# Checkpoint CRUD(LangGraph)
# -----------------------------------------------------------------------

async def put_checkpoint_async(
Expand All @@ -1349,6 +1366,7 @@ async def put_checkpoint_async(
("checkpoint_id", checkpoint_id),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
("checkpoint_type", checkpoint_type),
("checkpoint_data", checkpoint_data),
("metadata", metadata_json),
Expand Down Expand Up @@ -1502,6 +1520,7 @@ async def put_checkpoint_writes_async(
("task_idx", w["task_idx"]),
]
attrs = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
("task_id", w["task_id"]),
("task_path", w.get("task_path", "")),
("channel", w["channel"]),
Expand Down Expand Up @@ -1580,6 +1599,7 @@ async def put_checkpoint_blob_async(
("version", version),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
("blob_type", blob_type),
("blob_data", blob_data),
]
Expand Down Expand Up @@ -1747,7 +1767,7 @@ async def _scan_and_delete_async(
await self._async_client.batch_write_row(request)

# -----------------------------------------------------------------------
# 内部辅助方法(I/O 相关,异步
# 内部辅助方法(I/O 相关)
# -----------------------------------------------------------------------

async def _get_chunk_count_async(
Expand Down
16 changes: 8 additions & 8 deletions agentrun/conversation_service/__session_store_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def init_langchain_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()

async def init_langgraph_tables_async(self) -> None:
"""创建 LangGraph 所需的全部表和索引(异步)。
Expand All @@ -81,7 +81,7 @@ async def init_langgraph_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()
await self._backend.init_checkpoint_tables_async()

async def init_adk_tables_async(self) -> None:
Expand All @@ -96,7 +96,7 @@ async def init_adk_tables_async(self) -> None:
await self._backend.init_search_index_async()

# -------------------------------------------------------------------
# Checkpoint 管理(LangGraph)(异步)
# Checkpoint 管理(LangGraph)
# -------------------------------------------------------------------

async def put_checkpoint_async(
Expand Down Expand Up @@ -210,7 +210,7 @@ async def delete_thread_checkpoints_async(
await self._backend.delete_thread_checkpoints_async(thread_id)

# -------------------------------------------------------------------
# Session 管理(异步)/ Session management (async)
# Session 管理 / Session management
# -------------------------------------------------------------------

async def create_session_async(
Expand Down Expand Up @@ -496,7 +496,7 @@ async def update_session_async(
)

# -------------------------------------------------------------------
# Event 管理(异步)/ Event management (async)
# Event 管理 / Event management
# -------------------------------------------------------------------

async def append_event_async(
Expand Down Expand Up @@ -631,7 +631,7 @@ async def get_recent_events_async(
return events

# -------------------------------------------------------------------
# State 管理(异步)/ State management (async)
# State 管理 / State management
# -------------------------------------------------------------------

async def get_session_state_async(
Expand Down Expand Up @@ -746,7 +746,7 @@ async def get_merged_state_async(
return merged

# -------------------------------------------------------------------
# 内部辅助方法(异步)
# 内部辅助方法
# -------------------------------------------------------------------

async def _apply_delta_async(
Expand Down Expand Up @@ -802,7 +802,7 @@ async def _apply_delta_async(
)

# -------------------------------------------------------------------
# 工厂方法(异步)/ Factory methods (async)
# 工厂方法 / Factory methods
# -------------------------------------------------------------------

@classmethod
Expand Down
35 changes: 35 additions & 0 deletions agentrun/conversation_service/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,41 @@
DEFAULT_CHECKPOINT_WRITES_TABLE = "checkpoint_writes"
DEFAULT_CHECKPOINT_BLOBS_TABLE = "checkpoint_blobs"

# ---------------------------------------------------------------------------
# OTS Schema 版本管理
#
# 用于 SDK 写入端与 Core 读取端(funagent-core)的兼容性协调。
# 每次写入行(PutRow / UpdateRow / BatchWriteRow)时在
# attribute_columns 中携带 _schema_version 字段。
# Core 端读取时检查该字段,版本不匹配时打 WARN 日志并尽力解析。
# 历史数据(无此字段)视为 v0。
#
# 版本计数规则:
# - 大部分表独立计数
# - state / app_state / user_state 三张表共享 STATE_SCHEMA_VERSION
#
# 升级流程:
# 1. 递增对应表的 *_SCHEMA_VERSION 常量
# 2. 在 PR 描述中记录变更的列名/类型/语义
# 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量
# 4. 如涉及 breaking change,提供数据迁移指引
Comment on lines +44 to +48
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的升级流程要求“在 PR 描述中记录变更的列名/类型/语义”,但当前 PR 描述仍是未填写的模板内容,缺少对新增 _schema_version 列与索引初始化 API 调整的说明。建议补全 PR 描述以便评审与发布时追踪兼容性影响。

Copilot uses AI. Check for mistakes.
#
# 兼容性规则:
# - 只加不删:新增列允许,删除/重命名列视为 breaking change
# - PK 不可变:主键结构永不改变
# - 索引名不可变:Search Index 名称一旦确定不再修改
# - 语义不可变:已有列的类型和含义不改变
# ---------------------------------------------------------------------------

SCHEMA_VERSION_COLUMN = "_schema_version"

CONVERSATION_SCHEMA_VERSION = 1
EVENT_SCHEMA_VERSION = 1
STATE_SCHEMA_VERSION = 1 # state / app_state / user_state 共享
CHECKPOINT_SCHEMA_VERSION = 1
CHECKPOINT_WRITES_SCHEMA_VERSION = 1
CHECKPOINT_BLOBS_SCHEMA_VERSION = 1


# ---------------------------------------------------------------------------
# 枚举
Expand Down
Loading
Loading