Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions agentrun/conversation_service/__ots_backend_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
)

from agentrun.conversation_service.model import (
CHECKPOINT_BLOBS_SCHEMA_VERSION,
CHECKPOINT_SCHEMA_VERSION,
CHECKPOINT_WRITES_SCHEMA_VERSION,
CONVERSATION_SCHEMA_VERSION,
ConversationEvent,
ConversationSession,
DEFAULT_APP_STATE_TABLE,
Expand All @@ -48,6 +52,9 @@
DEFAULT_STATE_SEARCH_INDEX,
DEFAULT_STATE_TABLE,
DEFAULT_USER_STATE_TABLE,
EVENT_SCHEMA_VERSION,
SCHEMA_VERSION_COLUMN,
STATE_SCHEMA_VERSION,
StateData,
StateScope,
)
Expand Down Expand Up @@ -174,6 +181,13 @@ async def init_search_index_async(self) -> None:
await self._create_conversation_search_index_async()
await self._create_state_search_index_async()

async def init_conversation_search_index_async(self) -> None:
"""仅创建 Conversation 多元索引(异步)。

索引已存在时跳过,可重复调用。
"""
await self._create_conversation_search_index_async()

async def init_checkpoint_tables_async(self) -> None:
"""创建 LangGraph checkpoint 相关的 3 张表(异步)。

Expand Down Expand Up @@ -607,6 +621,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
]

attribute_columns = [
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
("created_at", session.created_at),
("updated_at", session.updated_at),
("is_pinned", session.is_pinned),
Expand Down Expand Up @@ -991,6 +1006,7 @@ async def put_event_async(

content_json = json.dumps(content, ensure_ascii=False)
attribute_columns = [
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
("type", event_type),
("content", content_json),
("created_at", created_at),
Expand Down Expand Up @@ -1204,6 +1220,7 @@ async def put_state_async(
state_json = serialize_state(state)

put_cols: list[tuple[str, Any]] = [
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
("updated_at", now),
("version", version + 1),
]
Expand Down Expand Up @@ -1349,6 +1366,7 @@ async def put_checkpoint_async(
("checkpoint_id", checkpoint_id),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
("checkpoint_type", checkpoint_type),
("checkpoint_data", checkpoint_data),
("metadata", metadata_json),
Expand Down Expand Up @@ -1502,6 +1520,7 @@ async def put_checkpoint_writes_async(
("task_idx", w["task_idx"]),
]
attrs = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
("task_id", w["task_id"]),
("task_path", w.get("task_path", "")),
("channel", w["channel"]),
Expand Down Expand Up @@ -1580,6 +1599,7 @@ async def put_checkpoint_blob_async(
("version", version),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
("blob_type", blob_type),
("blob_data", blob_data),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def init_langchain_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()

async def init_langgraph_tables_async(self) -> None:
"""创建 LangGraph 所需的全部表和索引(异步)。
Expand All @@ -81,7 +81,7 @@ async def init_langgraph_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()
await self._backend.init_checkpoint_tables_async()

async def init_adk_tables_async(self) -> None:
Expand Down
30 changes: 30 additions & 0 deletions agentrun/conversation_service/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,36 @@
DEFAULT_CHECKPOINT_WRITES_TABLE = "checkpoint_writes"
DEFAULT_CHECKPOINT_BLOBS_TABLE = "checkpoint_blobs"

# ---------------------------------------------------------------------------
# OTS Schema 版本管理
#
# 每张表独立计数,用于 SDK 写入端与 Core 读取端(funagent-core)的兼容性协调。
# 每次 PutRow 时在 attribute_columns 中写入 _schema_version 字段。
# Core 端读取时检查该字段,版本不匹配时打 WARN 日志并尽力解析。
# 历史数据(无此字段)视为 v0。
#
# 升级流程:
# 1. 递增对应表的 *_SCHEMA_VERSION 常量
# 2. 在 PR 描述中记录变更的列名/类型/语义
# 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量
# 4. 如涉及 breaking change,提供数据迁移指引
Comment on lines +44 to +48
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的升级流程要求“在 PR 描述中记录变更的列名/类型/语义”,但当前 PR 描述仍是未填写的模板内容,缺少对新增 _schema_version 列与索引初始化 API 调整的说明。建议补全 PR 描述以便评审与发布时追踪兼容性影响。

Copilot uses AI. Check for mistakes.
#
# 兼容性规则:
# - 只加不删:新增列允许,删除/重命名列视为 breaking change
# - PK 不可变:主键结构永不改变
# - 索引名不可变:Search Index 名称一旦确定不再修改
# - 语义不可变:已有列的类型和含义不改变
# ---------------------------------------------------------------------------

SCHEMA_VERSION_COLUMN = "_schema_version"

CONVERSATION_SCHEMA_VERSION = 1
EVENT_SCHEMA_VERSION = 1
STATE_SCHEMA_VERSION = 1 # state / app_state / user_state 共享
CHECKPOINT_SCHEMA_VERSION = 1
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

该段注释说明“每次 PutRow 时在 attribute_columns 中写入 _schema_version 字段”,但实际写入路径包含 UpdateRow(state)以及 BatchWriteRow(checkpoint_writes)。另外注释写“每张表独立计数”,但 STATE_SCHEMA_VERSION 又被 state/app_state/user_state 三张表共享,表述上不一致。建议补充说明覆盖 put_row/update_row/batch_write_row,并明确 state 三张表共享同一版本(或改为分别计数)。

Copilot uses AI. Check for mistakes.
CHECKPOINT_WRITES_SCHEMA_VERSION = 1
CHECKPOINT_BLOBS_SCHEMA_VERSION = 1


# ---------------------------------------------------------------------------
# 枚举
Expand Down
61 changes: 37 additions & 24 deletions agentrun/conversation_service/ots_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
)

from agentrun.conversation_service.model import (
CHECKPOINT_BLOBS_SCHEMA_VERSION,
CHECKPOINT_SCHEMA_VERSION,
CHECKPOINT_WRITES_SCHEMA_VERSION,
CONVERSATION_SCHEMA_VERSION,
ConversationEvent,
ConversationSession,
DEFAULT_APP_STATE_TABLE,
Expand All @@ -58,6 +62,9 @@
DEFAULT_STATE_SEARCH_INDEX,
DEFAULT_STATE_TABLE,
DEFAULT_USER_STATE_TABLE,
EVENT_SCHEMA_VERSION,
SCHEMA_VERSION_COLUMN,
STATE_SCHEMA_VERSION,
StateData,
StateScope,
)
Expand Down Expand Up @@ -242,6 +249,20 @@ def init_search_index(self) -> None:
self._create_conversation_search_index()
self._create_state_search_index()

async def init_conversation_search_index_async(self) -> None:
"""仅创建 Conversation 多元索引(异步)。

索引已存在时跳过,可重复调用。
"""
await self._create_conversation_search_index_async()

def init_conversation_search_index(self) -> None:
"""仅创建 Conversation 多元索引(同步)。

索引已存在时跳过,可重复调用。
"""
self._create_conversation_search_index()

async def init_checkpoint_tables_async(self) -> None:
"""创建 LangGraph checkpoint 相关的 3 张表(异步)。

Expand Down Expand Up @@ -991,21 +1012,13 @@ async def _create_state_search_index_async(self) -> None:
self._state_table,
)
except OTSServiceError as e:
err_str = str(e).lower()
if "already exist" in err_str or (
if "already exist" in str(e).lower() or (
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
):
logger.warning(
"Search index %s already exists, skipping.",
self._state_search_index,
)
elif "does not exist" in err_str and "table" in err_str:
logger.warning(
"Table %s does not exist, skipping search index creation"
" for %s.",
self._state_table,
self._state_search_index,
)
else:
raise

Expand Down Expand Up @@ -1084,21 +1097,13 @@ def _create_state_search_index(self) -> None:
self._state_table,
)
except OTSServiceError as e:
err_str = str(e).lower()
if "already exist" in err_str or (
if "already exist" in str(e).lower() or (
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
):
logger.warning(
"Search index %s already exists, skipping.",
self._state_search_index,
)
elif "does not exist" in err_str and "table" in err_str:
logger.warning(
"Table %s does not exist, skipping search index creation"
" for %s.",
self._state_table,
self._state_search_index,
)
else:
raise

Expand All @@ -1115,6 +1120,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
]

attribute_columns = [
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
("created_at", session.created_at),
("updated_at", session.updated_at),
("is_pinned", session.is_pinned),
Expand Down Expand Up @@ -1148,6 +1154,7 @@ def put_session(self, session: ConversationSession) -> None:
]

attribute_columns = [
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
("created_at", session.created_at),
("updated_at", session.updated_at),
("is_pinned", session.is_pinned),
Expand Down Expand Up @@ -1844,6 +1851,7 @@ async def put_event_async(

content_json = json.dumps(content, ensure_ascii=False)
attribute_columns = [
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
("type", event_type),
("content", content_json),
("created_at", created_at),
Expand Down Expand Up @@ -1918,6 +1926,7 @@ def put_event(

content_json = json.dumps(content, ensure_ascii=False)
attribute_columns = [
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
("type", event_type),
("content", content_json),
("created_at", created_at),
Expand Down Expand Up @@ -2282,6 +2291,7 @@ async def put_state_async(
state_json = serialize_state(state)

put_cols: list[tuple[str, Any]] = [
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
("updated_at", now),
("version", version + 1),
]
Expand Down Expand Up @@ -2374,6 +2384,7 @@ def put_state(
state_json = serialize_state(state)

put_cols: list[tuple[str, Any]] = [
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
("updated_at", now),
("version", version + 1),
]
Expand Down Expand Up @@ -2542,7 +2553,7 @@ async def delete_state_row_async(
await self._async_client.delete_row(table_name, row, condition)

# -----------------------------------------------------------------------
# State CRUD(同步
# Checkpoint CRUD(LangGraph)(异步
# -----------------------------------------------------------------------

def delete_state_row(
Expand All @@ -2561,7 +2572,7 @@ def delete_state_row(
self._client.delete_row(table_name, row, condition)

# -----------------------------------------------------------------------
# Checkpoint CRUD(LangGraph)(异步
# Checkpoint CRUD(LangGraph)(同步
# -----------------------------------------------------------------------
Comment on lines 2555 to 2576
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的分段注释标题与实际方法段落不匹配:delete_state_row(同步)仍属于 State CRUD,但其前置标题被改成了“Checkpoint CRUD(异步)”;紧接着 put_checkpoint_async(异步)前的标题又写成了“Checkpoint CRUD(同步)”。建议修正分段注释,避免阅读/维护时混淆同步与异步以及 State vs Checkpoint 的边界。

Copilot uses AI. Check for mistakes.

async def put_checkpoint_async(
Expand All @@ -2582,6 +2593,7 @@ async def put_checkpoint_async(
("checkpoint_id", checkpoint_id),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
("checkpoint_type", checkpoint_type),
("checkpoint_data", checkpoint_data),
("metadata", metadata_json),
Expand All @@ -2591,10 +2603,6 @@ async def put_checkpoint_async(
condition = Condition(RowExistenceExpectation.IGNORE)
await self._async_client.put_row(self._checkpoint_table, row, condition)

# -----------------------------------------------------------------------
# Checkpoint CRUD(LangGraph)(同步)
# -----------------------------------------------------------------------

def put_checkpoint(
self,
thread_id: str,
Expand All @@ -2613,6 +2621,7 @@ def put_checkpoint(
("checkpoint_id", checkpoint_id),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
("checkpoint_type", checkpoint_type),
("checkpoint_data", checkpoint_data),
("metadata", metadata_json),
Expand Down Expand Up @@ -2881,6 +2890,7 @@ async def put_checkpoint_writes_async(
("task_idx", w["task_idx"]),
]
attrs = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
("task_id", w["task_id"]),
("task_path", w.get("task_path", "")),
("channel", w["channel"]),
Expand Down Expand Up @@ -2928,6 +2938,7 @@ def put_checkpoint_writes(
("task_idx", w["task_idx"]),
]
attrs = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
("task_id", w["task_id"]),
("task_path", w.get("task_path", "")),
("channel", w["channel"]),
Expand Down Expand Up @@ -3048,6 +3059,7 @@ async def put_checkpoint_blob_async(
("version", version),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
("blob_type", blob_type),
("blob_data", blob_data),
]
Expand Down Expand Up @@ -3075,6 +3087,7 @@ def put_checkpoint_blob(
("version", version),
]
attribute_columns = [
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
("blob_type", blob_type),
("blob_data", blob_data),
]
Expand Down
8 changes: 4 additions & 4 deletions agentrun/conversation_service/session_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def init_langchain_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()
Comment on lines 109 to +114
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init_langchain_tables_async/init_langchain_tables 的实现已改为只创建 Conversation 的 search index(init_conversation_search_index_*),但 docstring 仍笼统描述为“多元索引”。建议在注释中明确仅创建 conversation_search_index(不包含 state_search_index),避免调用方误以为包含 State 相关索引。

Copilot uses AI. Check for mistakes.

def init_langchain_tables(self) -> None:
"""创建 LangChain 所需的全部表和索引(同步)。
Expand All @@ -120,7 +120,7 @@ def init_langchain_tables(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
self._backend.init_core_tables()
self._backend.init_search_index()
self._backend.init_conversation_search_index()

async def init_langgraph_tables_async(self) -> None:
"""创建 LangGraph 所需的全部表和索引(异步)。
Expand All @@ -130,7 +130,7 @@ async def init_langgraph_tables_async(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
await self._backend.init_core_tables_async()
await self._backend.init_search_index_async()
await self._backend.init_conversation_search_index_async()
await self._backend.init_checkpoint_tables_async()

def init_langgraph_tables(self) -> None:
Expand All @@ -141,7 +141,7 @@ def init_langgraph_tables(self) -> None:
表或索引已存在时跳过,可重复调用。
"""
self._backend.init_core_tables()
self._backend.init_search_index()
self._backend.init_conversation_search_index()
self._backend.init_checkpoint_tables()

async def init_adk_tables_async(self) -> None:
Expand Down
Loading
Loading