Skip to content
Draft
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions graphiti_core/driver/neptune_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def __init__(self, host: str, aoss_host: str, port: int = 8182, aoss_port: int =
aoss_hostname = aoss_host.replace('https://', '').replace('http://', '')

session = boto3.Session()

# Configure OpenSearch client with retry and timeout settings
# AWS OpenSearch Serverless has aggressive connection limits and may close
# connections prematurely. These settings help handle transient failures.
self.aoss_client = OpenSearch(
hosts=[{'host': aoss_hostname, 'port': aoss_port}],
http_auth=Urllib3AWSV4SignerAuth(
Expand All @@ -218,7 +222,11 @@ def __init__(self, host: str, aoss_host: str, port: int = 8182, aoss_port: int =
use_ssl=True,
verify_certs=True,
connection_class=Urllib3HttpConnection,
pool_maxsize=20,
pool_maxsize=10, # Reduced from 20 to avoid overwhelming AOSS connection limits
timeout=30, # 30 second timeout to prevent hanging connections
max_retries=5, # Enable retry logic with exponential backoff
retry_on_timeout=True, # Retry when timeout occurs
retry_on_status=[502, 503, 504], # Retry on gateway errors and service unavailable
)

# Instantiate Neptune operations
Expand Down Expand Up @@ -428,7 +436,18 @@ async def run(self, query: str | list, **kwargs: Any) -> Any:
if isinstance(query, list):
res = None
for q in query:
res = await self.driver.execute_query(q, **kwargs)
# Handle both tuple (query, params) format and plain string queries
if isinstance(q, tuple):
if len(q) >= 2:
# Unpack query and params from tuple
query_str, params = q[0], q[1]
res = await self.driver.execute_query(query_str, **params)
else:
# Single element tuple, treat as query string
res = await self.driver.execute_query(str(q[0]), **kwargs)
else:
# Plain string query
res = await self.driver.execute_query(q, **kwargs)
return res
else:
return await self.driver.execute_query(str(query), **kwargs)
28 changes: 22 additions & 6 deletions graphiti_core/models/edges/edge_db_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ def get_entity_edge_save_query(provider: GraphProvider, has_aoss: bool = False)
MATCH (source:Entity {uuid: $edge_data.source_uuid})
MATCH (target:Entity {uuid: $edge_data.target_uuid})
MERGE (source)-[e:RELATES_TO {uuid: $edge_data.uuid}]->(target)
SET e = removeKeyFromMap(removeKeyFromMap($edge_data, "fact_embedding"), "episodes")
SET e.fact_embedding = join([x IN coalesce($edge_data.fact_embedding, []) | toString(x) ], ",")
SET e.episodes = join($edge_data.episodes, ",")
SET e.group_id = $edge_data.group_id,
e.source_node_uuid = $edge_data.source_uuid,
e.target_node_uuid = $edge_data.target_uuid,
e.created_at = $edge_data.created_at,
e.name = $edge_data.name,
e.fact = $edge_data.fact,
e.expired_at = $edge_data.expired_at,
e.valid_at = $edge_data.valid_at,
e.invalid_at = $edge_data.invalid_at,
e.fact_embedding = join([x IN coalesce($edge_data.fact_embedding, []) | toString(x) ], ","),
e.episodes = join($edge_data.episodes, ",")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neptune queries silently drop custom attributes during save

High Severity

The new Neptune queries use explicit property lists instead of the previous SET e = removeKeyFromMap(...) pattern, but this omits custom attributes. Both EntityEdge and EntityNode have an attributes: dict[str, Any] field. The save() methods spread these attributes into the data dictionary via edge_data.update(self.attributes or {}). The old approach copied all properties including these spread attributes. The new explicit lists only include built-in fields, causing any custom attributes to be silently dropped on save. The same issue affects both edge queries and node queries for Neptune.

Additional Locations (1)

Fix in Cursor Fix in Web

RETURN $edge_data.uuid AS uuid
"""
case GraphProvider.KUZU:
Expand Down Expand Up @@ -140,9 +148,17 @@ def get_entity_edge_save_bulk_query(provider: GraphProvider, has_aoss: bool = Fa
MATCH (source:Entity {uuid: edge.source_node_uuid})
MATCH (target:Entity {uuid: edge.target_node_uuid})
MERGE (source)-[r:RELATES_TO {uuid: edge.uuid}]->(target)
SET r = removeKeyFromMap(removeKeyFromMap(edge, "fact_embedding"), "episodes")
SET r.fact_embedding = join([x IN coalesce(edge.fact_embedding, []) | toString(x) ], ",")
SET r.episodes = join(edge.episodes, ",")
SET r.group_id = edge.group_id,
r.source_node_uuid = edge.source_node_uuid,
r.target_node_uuid = edge.target_node_uuid,
r.created_at = edge.created_at,
r.name = edge.name,
r.fact = edge.fact,
r.expired_at = edge.expired_at,
r.valid_at = edge.valid_at,
r.invalid_at = edge.invalid_at,
r.fact_embedding = join([x IN coalesce(edge.fact_embedding, []) | toString(x) ], ","),
r.episodes = join(edge.episodes, ",")
RETURN edge.uuid AS uuid
"""
case GraphProvider.KUZU:
Expand Down
19 changes: 14 additions & 5 deletions graphiti_core/models/nodes/node_db_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ def get_episode_node_save_bulk_query(provider: GraphProvider) -> str:
e.group_id AS group_id,
e.source_description AS source_description,
e.source AS source,
split(e.entity_edges, "|") AS entity_edges
CASE WHEN e.entity_edges IS NULL OR e.entity_edges = ''
THEN []
ELSE split(e.entity_edges, "|")
END AS entity_edges
"""


Expand Down Expand Up @@ -157,8 +160,11 @@ def get_entity_node_save_query(provider: GraphProvider, labels: str, has_aoss: b
return f"""
MERGE (n:Entity {{uuid: $entity_data.uuid}})
{label_subquery}
SET n = removeKeyFromMap(removeKeyFromMap($entity_data, "labels"), "name_embedding")
SET n.name_embedding = join([x IN coalesce($entity_data.name_embedding, []) | toString(x) ], ",")
SET n.name = $entity_data.name,
n.group_id = $entity_data.group_id,
n.created_at = $entity_data.created_at,
n.summary = $entity_data.summary,
n.name_embedding = join([x IN coalesce($entity_data.name_embedding, []) | toString(x) ], ",")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neptune entity nodes lose custom attributes on save

High Severity

The Neptune query now explicitly sets only name, group_id, created_at, summary, and name_embedding. The previous query used removeKeyFromMap to set all properties from $entity_data, which included custom attributes merged via entity_data.update(self.attributes). Custom entity attributes are now silently dropped on save.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neptune save queries silently drop entity/edge attributes

High Severity

The Neptune save queries were changed from SET n/e = removeKeyFromMap(...) (which wrote all properties from the data dict, including dynamically flattened attributes) to explicit field-by-field SET statements. The explicit statements omit attributes entirely. Since the save methods in nodes.py and edges.py call entity_data.update(self.attributes or {}) to merge attributes as top-level keys, the old approach saved them as graph properties. Now those attribute properties are silently dropped, causing data loss for any Neptune users with custom entity or edge types that define additional attributes.

Additional Locations (2)

Fix in Cursor Fix in Web

RETURN n.uuid AS uuid
"""
case _:
Expand Down Expand Up @@ -214,8 +220,11 @@ def get_entity_node_save_bulk_query(
UNWIND $nodes AS node
MERGE (n:Entity {{uuid: node.uuid}})
{labels}
SET n = removeKeyFromMap(removeKeyFromMap(node, "labels"), "name_embedding")
SET n.name_embedding = join([x IN coalesce(node.name_embedding, []) | toString(x) ], ",")
SET n.name = node.name,
n.group_id = node.group_id,
n.created_at = node.created_at,
n.summary = node.summary,
n.name_embedding = join([x IN coalesce(node.name_embedding, []) | toString(x) ], ",")
RETURN n.uuid AS uuid
"""
)
Expand Down
78 changes: 71 additions & 7 deletions graphiti_core/prompts/dedupe_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class Versions(TypedDict):


def resolve_edge(context: dict[str, Any]) -> list[Message]:
existing_facts_count = len(context.get('existing_edges', []))
invalidation_candidates_count = len(context.get('edge_invalidation_candidates', []))

existing_range = f'0 to {existing_facts_count - 1}' if existing_facts_count > 0 else 'none (empty list)'
invalidation_range = f'0 to {invalidation_candidates_count - 1}' if invalidation_candidates_count > 0 else 'none (empty list)'

return [
Message(
role='system',
Expand All @@ -50,16 +56,28 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]:
Message(
role='user',
content=f"""
You will analyze a NEW FACT against two separate lists of existing facts.

Task:
You will receive TWO lists of facts with CONTINUOUS idx numbering across both lists.
EXISTING FACTS are indexed first, followed by FACT INVALIDATION CANDIDATES.


═══════════════════════════════════════════════════════════════
LIST A: EXISTING FACTS (for duplicate detection)
═══════════════════════════════════════════════════════════════
Count: {existing_facts_count} facts
Valid idx range: {existing_range}

1. DUPLICATE DETECTION:
- If the NEW FACT represents identical factual information as any fact in EXISTING FACTS, return those idx values in duplicate_facts.
- Facts with similar information that contain key differences should NOT be marked as duplicates.
- If no duplicates, return an empty list for duplicate_facts.

2. CONTRADICTION DETECTION:
2. FACT TYPE CLASSIFICATION:
- Given the predefined FACT TYPES, determine if the NEW FACT should be classified as one of these types.
- Return the fact type as fact_type or DEFAULT if NEW FACT is not one of the FACT TYPES.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response model missing required fact_type field

Medium Severity

The prompt instructs the LLM to return a fact_type field (documented at lines 73-75 and 135-138), but the EdgeDuplicate response model only defines duplicate_facts and contradicted_facts fields. The LLM-generated fact_type value will either be silently discarded or cause validation errors, making the fact type classification feature non-functional.

Additional Locations (1)

Fix in Cursor Fix in Web


3. CONTRADICTION DETECTION:
- Determine which facts the NEW FACT contradicts from either list.
- A fact from EXISTING FACTS can be both a duplicate AND contradicted (e.g., semantically the same but the new fact updates/supersedes it).
- Return all contradicted idx values in contradicted_facts.
Expand All @@ -74,17 +92,63 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]:
1. Some facts may be very similar but will have key differences, particularly around numeric values.
Do not mark these as duplicates.

<FACT TYPES>
{context['edge_types']}
</FACT TYPES>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing edge_types in context causes KeyError

High Severity

The prompt template accesses context['edge_types'] at two locations, but the context dictionary built in resolve_extracted_edge function (in edge_operations.py lines 555-559) only includes existing_edges, new_edge, and edge_invalidation_candidates. When this prompt is called, it will raise a KeyError for the missing edge_types key, causing the edge deduplication to fail.

Additional Locations (1)

Fix in Cursor Fix in Web


<EXISTING FACTS>
{context['existing_edges']}
</EXISTING FACTS>

<FACT INVALIDATION CANDIDATES>
═══════════════════════════════════════════════════════════════
LIST B: FACT INVALIDATION CANDIDATES (for contradiction detection)
═══════════════════════════════════════════════════════════════
Count: {invalidation_candidates_count} facts
Valid idx range: {invalidation_range}

{context['edge_invalidation_candidates']}
</FACT INVALIDATION CANDIDATES>

<NEW FACT>
═══════════════════════════════════════════════════════════════
NEW FACT TO ANALYZE
═══════════════════════════════════════════════════════════════
{context['new_edge']}
</NEW FACT>

═══════════════════════════════════════════════════════════════
FACT TYPES FOR CLASSIFICATION
═══════════════════════════════════════════════════════════════
{context['edge_types']}

═══════════════════════════════════════════════════════════════
YOUR RESPONSE MUST INCLUDE THREE FIELDS
═══════════════════════════════════════════════════════════════

1. duplicate_facts (list of integers)
SOURCE: Use idx values ONLY from LIST A (EXISTING FACTS)
VALID RANGE: {existing_range}
PURPOSE: Identify which facts in LIST A are duplicates of the NEW FACT
CRITERIA: Facts must represent identical factual information (minor wording differences OK)
NOTE: Facts with key differences (especially numeric values) are NOT duplicates
IF NO DUPLICATES: Return empty list []

2. contradicted_facts (list of integers)
SOURCE: Use idx values ONLY from LIST B (FACT INVALIDATION CANDIDATES)
VALID RANGE: {invalidation_range}
PURPOSE: Identify which facts in LIST B are contradicted by the NEW FACT
CRITERIA: Facts that are logically incompatible with the NEW FACT
IF NO CONTRADICTIONS: Return empty list []

3. fact_type (string)
SOURCE: Choose from FACT TYPES listed above
PURPOSE: Classify the NEW FACT's type
DEFAULT: Return 'DEFAULT' if NEW FACT doesn't match any predefined FACT TYPES

═══════════════════════════════════════════════════════════════
CRITICAL WARNINGS
═══════════════════════════════════════════════════════════════
- LIST A and LIST B are COMPLETELY SEPARATE with INDEPENDENT indexing
- Do NOT use idx values from LIST B in duplicate_facts field
- Do NOT use idx values from LIST A in contradicted_facts field
- Each list starts indexing from 0 independently
- Verify your idx values are within the valid ranges specified above
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prompt contradicts data indexing and consuming code

High Severity

The new prompt has contradictory indexing instructions. The data passed uses continuous idx numbering (invalidation candidates start where existing facts end), and the consuming code in edge_operations.py expects this. However, the "CRITICAL WARNINGS" section says lists have "INDEPENDENT indexing" starting from 0, and contradicted_facts must only use LIST B indices. The invalidation_range (line 48) also computes a 0-based range that doesn't match the actual offset-based idx values in the data. This will cause the LLM to return incorrect idx values, leading to wrong edges being invalidated.

Fix in Cursor Fix in Web

""",
),
]
Expand Down
Loading