diff --git a/crates/lance-context-api/src/lib.rs b/crates/lance-context-api/src/lib.rs index e84eec5..246c587 100644 --- a/crates/lance-context-api/src/lib.rs +++ b/crates/lance-context-api/src/lib.rs @@ -178,6 +178,10 @@ pub struct AddRecordRequest { #[serde(default, skip_serializing_if = "Option::is_none")] pub session_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + pub tenant: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub external_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub state_metadata: Option, @@ -228,6 +232,10 @@ pub struct RecordPatchDto { #[serde(default, skip_serializing_if = "Option::is_none")] pub session_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + pub tenant: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub state_metadata: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub metadata: Option, @@ -252,6 +260,8 @@ impl RecordPatchDto { pub fn is_empty(&self) -> bool { self.bot_id.is_none() && self.session_id.is_none() + && self.tenant.is_none() + && self.source.is_none() && self.state_metadata.is_none() && self.metadata.is_none() && self.relationships.is_none() @@ -294,6 +304,10 @@ pub struct RecordDto { pub bot_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub session_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tenant: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source: Option, pub created_at: DateTime, pub role: String, pub content_type: String, diff --git a/crates/lance-context-core/src/api_impl.rs b/crates/lance-context-core/src/api_impl.rs index 23943e7..f287484 100644 --- a/crates/lance-context-core/src/api_impl.rs +++ b/crates/lance-context-core/src/api_impl.rs @@ -335,6 +335,8 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { RecordPatch { bot_id: patch.bot_id.clone(), session_id: patch.session_id.clone(), + tenant: patch.tenant.clone(), + source: patch.source.clone(), state_metadata: patch.state_metadata.as_ref().map(|sm| StateMetadata { step: sm.step, active_plan_id: sm.active_plan_id.clone(), @@ -365,6 +367,8 @@ fn record_from_add_request(r: &AddRecordRequest, id: String, run_id: String) -> run_id, bot_id: r.bot_id.clone(), session_id: r.session_id.clone(), + tenant: r.tenant.clone(), + source: r.source.clone(), created_at: Utc::now(), role: r.role.clone(), state_metadata: r.state_metadata.as_ref().map(|sm| StateMetadata { @@ -401,6 +405,8 @@ fn record_to_dto(r: ContextRecord) -> RecordDto { run_id: r.run_id, bot_id: r.bot_id, session_id: r.session_id, + tenant: r.tenant, + source: r.source, created_at: r.created_at, role: r.role, content_type: r.content_type, diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index 160eac3..6024167 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -3,11 +3,13 @@ mod api_impl; mod context; +mod namespace; mod record; pub mod serde; mod store; pub use context::{Context, ContextEntry, Snapshot}; +pub use namespace::{ContextNamespace, PartitionInfo, PartitionSelector, PartitionSpec}; pub use record::{ ContextRecord, LifecycleQueryOptions, MetadataFilter, RecordFilters, RecordPatch, Relationship, RetrieveResult, SearchResult, StateMetadata, UpdateResult, UpsertResult, LIFECYCLE_ACTIVE, diff --git a/crates/lance-context-core/src/namespace.rs b/crates/lance-context-core/src/namespace.rs new file mode 100644 index 0000000..f022371 --- /dev/null +++ b/crates/lance-context-core/src/namespace.rs @@ -0,0 +1,550 @@ +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; + +use arrow_array::{ + Array, Int32Array, LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, +}; +use arrow_schema::{ArrowError, DataType, Field, Schema}; +use futures::TryStreamExt; +use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; +use lance::io::{ObjectStoreParams, StorageOptionsAccessor}; +use lance::{Error as LanceError, Result as LanceResult}; +use uuid::Uuid; + +use crate::store::{ContextStore, ContextStoreOptions}; + +const MANIFEST_TABLE_NAME: &str = "__manifest"; +const PARTITION_TABLE_NAME: &str = "dataset"; + +/// Complete selector values for one context partition. +pub type PartitionSelector = BTreeMap; + +/// Phase-1 partition specification for a context namespace. +/// +/// This intentionally models only identity partition fields. More advanced +/// Lance partition transforms (`bucket`, `truncate`, time transforms) can be +/// layered in once the namespace resolver grows past the single-partition path. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionSpec { + version: i32, + fields: Vec, +} + +impl PartitionSpec { + /// Create a v1 identity partition spec from ordered field names. + pub fn new(fields: I) -> LanceResult + where + I: IntoIterator, + S: Into, + { + Self::with_version(1, fields) + } + + /// Create an identity partition spec with an explicit version. + pub fn with_version(version: i32, fields: I) -> LanceResult + where + I: IntoIterator, + S: Into, + { + if version <= 0 { + return Err(invalid_input("partition spec version must be positive")); + } + + let mut seen = HashSet::new(); + let mut normalized = Vec::new(); + for field in fields { + let field = field.into(); + if field.trim().is_empty() { + return Err(invalid_input("partition field names must be non-empty")); + } + if !seen.insert(field.clone()) { + return Err(invalid_input(format!( + "duplicate partition field '{field}'" + ))); + } + normalized.push(field); + } + if normalized.is_empty() { + return Err(invalid_input("partition spec requires at least one field")); + } + + Ok(Self { + version, + fields: normalized, + }) + } + + #[must_use] + pub fn tenant() -> Self { + Self { + version: 1, + fields: vec!["tenant".to_string()], + } + } + + #[must_use] + pub fn tenant_source() -> Self { + Self { + version: 1, + fields: vec!["tenant".to_string(), "source".to_string()], + } + } + + #[must_use] + pub fn version(&self) -> i32 { + self.version + } + + #[must_use] + pub fn fields(&self) -> &[String] { + &self.fields + } +} + +/// Manifest row describing one resolved context partition. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionInfo { + pub partition_id: String, + pub spec_version: i32, + pub selector: PartitionSelector, + pub dataset_uri: String, +} + +/// Thin resolver for scoped context partitions under one namespace root. +/// +/// Phase 1 supports opening exactly one partition from a complete selector. The +/// returned value is today's [`ContextStore`], preserving existing per-context +/// behavior while putting each selector in a physically separate Lance dataset. +#[derive(Debug, Clone)] +pub struct ContextNamespace { + root_uri: String, + partition_spec: PartitionSpec, + context_options: ContextStoreOptions, +} + +impl ContextNamespace { + /// Create or open a namespace root with default context-store options. + pub async fn create( + root_uri: impl Into, + partition_spec: PartitionSpec, + ) -> LanceResult { + Self::create_with_options(root_uri, partition_spec, ContextStoreOptions::default()).await + } + + /// Create or open a namespace root with explicit context-store options. + pub async fn create_with_options( + root_uri: impl Into, + partition_spec: PartitionSpec, + context_options: ContextStoreOptions, + ) -> LanceResult { + let namespace = Self::new(root_uri, partition_spec, context_options)?; + namespace.ensure_manifest().await?; + Ok(namespace) + } + + /// Build a namespace resolver without touching storage. + pub fn new( + root_uri: impl Into, + partition_spec: PartitionSpec, + context_options: ContextStoreOptions, + ) -> LanceResult { + let root_uri = normalize_root_uri(root_uri.into())?; + Ok(Self { + root_uri, + partition_spec, + context_options, + }) + } + + #[must_use] + pub fn root_uri(&self) -> &str { + &self.root_uri + } + + #[must_use] + pub fn partition_spec(&self) -> &PartitionSpec { + &self.partition_spec + } + + #[must_use] + pub fn manifest_uri(&self) -> String { + join_uri(&self.root_uri, MANIFEST_TABLE_NAME) + } + + /// Resolve a complete selector to its opaque partition dataset URI. + pub fn resolve_partition(&self, selector: &PartitionSelector) -> LanceResult { + self.validate_selector(selector)?; + let selector_json = selector_json(selector)?; + let partition_id = + partition_id(&self.root_uri, self.partition_spec.version, &selector_json); + let dataset_uri = join_uri( + &self.root_uri, + &format!( + "v{}/{}/{}", + self.partition_spec.version, partition_id, PARTITION_TABLE_NAME + ), + ); + + Ok(PartitionInfo { + partition_id, + spec_version: self.partition_spec.version, + selector: selector.clone(), + dataset_uri, + }) + } + + /// Open one partition as a normal [`ContextStore`]. + pub async fn context(&self, selector: &PartitionSelector) -> LanceResult { + let partition = self.resolve_partition(selector)?; + self.ensure_manifest_entry(&partition).await?; + ContextStore::open_with_options(&partition.dataset_uri, self.context_options.clone()).await + } + + /// List partition mappings currently recorded in `__manifest`. + pub async fn partitions(&self) -> LanceResult> { + let manifest = self.ensure_manifest().await?; + read_manifest(&manifest).await + } + + async fn ensure_manifest(&self) -> LanceResult { + match load_dataset(&self.manifest_uri(), self.context_options.storage_options()).await { + Ok(dataset) => Ok(dataset), + Err(LanceError::DatasetNotFound { .. }) => { + create_manifest(&self.manifest_uri(), self.context_options.storage_options()).await + } + Err(err) => Err(err), + } + } + + async fn ensure_manifest_entry(&self, partition: &PartitionInfo) -> LanceResult<()> { + let manifest = self.ensure_manifest().await?; + for existing in read_manifest(&manifest).await? { + if existing.partition_id == partition.partition_id { + if existing == *partition { + return Ok(()); + } + return Err(invalid_input(format!( + "partition id '{}' already maps to a different selector", + partition.partition_id + ))); + } + } + + append_manifest_entry( + &self.manifest_uri(), + self.context_options.storage_options(), + partition, + ) + .await + } + + fn validate_selector(&self, selector: &PartitionSelector) -> LanceResult<()> { + for field in &self.partition_spec.fields { + match selector.get(field) { + Some(value) if !value.is_empty() => {} + Some(_) => { + return Err(invalid_input(format!( + "partition selector value for '{field}' must be non-empty" + ))); + } + None => { + return Err(invalid_input(format!( + "partition selector is missing required field '{field}'" + ))); + } + } + } + + for field in selector.keys() { + if !self + .partition_spec + .fields + .iter() + .any(|expected| expected == field) + { + return Err(invalid_input(format!( + "partition selector contains unknown field '{field}'" + ))); + } + } + + Ok(()) + } +} + +fn manifest_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("partition_id", DataType::Utf8, false), + Field::new("spec_version", DataType::Int32, false), + Field::new("selector_json", DataType::LargeUtf8, false), + Field::new("dataset_uri", DataType::Utf8, false), + ])) +} + +async fn load_dataset( + uri: &str, + storage_options: Option>, +) -> LanceResult { + if let Some(options) = storage_options { + DatasetBuilder::from_uri(uri) + .with_storage_options(options) + .load() + .await + } else { + Dataset::open(uri).await + } +} + +async fn create_manifest( + uri: &str, + storage_options: Option>, +) -> LanceResult { + let schema = manifest_schema(); + let empty_batch = RecordBatch::new_empty(schema.clone()); + let batches = RecordBatchIterator::new( + vec![Ok::(empty_batch)].into_iter(), + schema, + ); + let params = write_params(WriteMode::Create, storage_options); + Dataset::write(batches, uri, Some(params)).await +} + +async fn append_manifest_entry( + uri: &str, + storage_options: Option>, + partition: &PartitionInfo, +) -> LanceResult<()> { + let selector_json = selector_json(&partition.selector)?; + let schema = manifest_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec![partition.partition_id.as_str()])), + Arc::new(Int32Array::from(vec![partition.spec_version])), + Arc::new(LargeStringArray::from(vec![selector_json.as_str()])), + Arc::new(StringArray::from(vec![partition.dataset_uri.as_str()])), + ], + )?; + let batches = RecordBatchIterator::new( + vec![Ok::(batch)].into_iter(), + schema, + ); + let params = write_params(WriteMode::Append, storage_options); + Dataset::write(batches, uri, Some(params)).await?; + Ok(()) +} + +fn write_params(mode: WriteMode, storage_options: Option>) -> WriteParams { + let mut params = WriteParams { + mode, + ..Default::default() + }; + + if let Some(options) = storage_options { + let store_params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + options, + ))), + ..Default::default() + }; + params.store_params = Some(store_params); + } + + params +} + +async fn read_manifest(dataset: &Dataset) -> LanceResult> { + let mut scanner = dataset.scan(); + scanner.project(&[ + "partition_id", + "spec_version", + "selector_json", + "dataset_uri", + ])?; + let mut stream = scanner.try_into_stream().await?; + let mut partitions = Vec::new(); + while let Some(batch) = stream.try_next().await? { + partitions.extend(manifest_batch_to_partitions(&batch)?); + } + Ok(partitions) +} + +fn manifest_batch_to_partitions(batch: &RecordBatch) -> LanceResult> { + let partition_id = column_as::(batch, "partition_id")?; + let spec_version = column_as::(batch, "spec_version")?; + let selector_json = column_as::(batch, "selector_json")?; + let dataset_uri = column_as::(batch, "dataset_uri")?; + + let mut partitions = Vec::with_capacity(batch.num_rows()); + for row in 0..batch.num_rows() { + partitions.push(PartitionInfo { + partition_id: partition_id.value(row).to_string(), + spec_version: spec_version.value(row), + selector: serde_json::from_str(selector_json.value(row)).map_err(|err| { + LanceError::from(ArrowError::InvalidArgumentError(format!( + "invalid partition selector JSON in manifest: {err}" + ))) + })?, + dataset_uri: dataset_uri.value(row).to_string(), + }); + } + Ok(partitions) +} + +fn column_as<'a, A>(batch: &'a RecordBatch, name: &str) -> LanceResult<&'a A> +where + A: Array + 'static, +{ + batch + .column_by_name(name) + .and_then(|col| col.as_ref().as_any().downcast_ref::()) + .ok_or_else(|| { + LanceError::from(ArrowError::InvalidArgumentError(format!( + "column '{name}' has unexpected data type" + ))) + }) +} + +fn selector_json(selector: &PartitionSelector) -> LanceResult { + serde_json::to_string(selector).map_err(|err| { + LanceError::from(ArrowError::InvalidArgumentError(format!( + "partition selector is not JSON serializable: {err}" + ))) + }) +} + +fn partition_id(root_uri: &str, spec_version: i32, selector_json: &str) -> String { + let key = format!("{root_uri}\n{spec_version}\n{selector_json}"); + let uuid = Uuid::new_v5(&Uuid::NAMESPACE_URL, key.as_bytes()) + .simple() + .to_string(); + uuid[..16].to_string() +} + +fn normalize_root_uri(root_uri: String) -> LanceResult { + let root_uri = root_uri.trim(); + if root_uri.is_empty() { + return Err(invalid_input("namespace root URI must be non-empty")); + } + if root_uri == "/" { + return Ok(root_uri.to_string()); + } + Ok(root_uri.trim_end_matches('/').to_string()) +} + +fn join_uri(root_uri: &str, child: &str) -> String { + if root_uri == "/" { + format!("/{child}") + } else { + format!("{root_uri}/{child}") + } +} + +fn invalid_input(message: impl Into) -> LanceError { + LanceError::from(ArrowError::InvalidArgumentError(message.into())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::record::{ContextRecord, LIFECYCLE_ACTIVE}; + use crate::serde::CONTENT_TYPE_TEXT; + use chrono::Utc; + use tempfile::TempDir; + + fn selector(pairs: &[(&str, &str)]) -> PartitionSelector { + pairs + .iter() + .map(|(key, value)| ((*key).to_string(), (*value).to_string())) + .collect() + } + + fn record(id: &str, tenant: &str, source: &str) -> ContextRecord { + ContextRecord { + id: id.to_string(), + external_id: None, + run_id: format!("run-{id}"), + bot_id: Some("support-bot".to_string()), + session_id: None, + tenant: Some(tenant.to_string()), + source: Some(source.to_string()), + created_at: Utc::now(), + role: "user".to_string(), + state_metadata: None, + metadata: None, + relationships: Vec::new(), + expires_at: None, + retention_policy: None, + lifecycle_status: LIFECYCLE_ACTIVE.to_string(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, + content_type: CONTENT_TYPE_TEXT.to_string(), + text_payload: Some("hello".to_string()), + binary_payload: None, + embedding: None, + } + } + + #[tokio::test] + async fn namespace_context_records_manifest_and_opens_partition() { + let dir = TempDir::new().unwrap(); + let root_uri = dir.path().to_string_lossy().to_string(); + let namespace = ContextNamespace::create(root_uri, PartitionSpec::tenant_source()) + .await + .unwrap(); + let selector = selector(&[("tenant", "acme"), ("source", "memory")]); + let partition = namespace.resolve_partition(&selector).unwrap(); + + let mut store = namespace.context(&selector).await.unwrap(); + store + .add(&[record("rec-1", "acme", "memory")]) + .await + .unwrap(); + + let records = store + .list_filtered( + None, + None, + Some(&crate::record::RecordFilters { + tenant: Some("acme".to_string()), + source: Some("memory".to_string()), + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(records.len(), 1); + + let partitions = namespace.partitions().await.unwrap(); + assert_eq!(partitions, vec![partition]); + + namespace.context(&selector).await.unwrap(); + assert_eq!(namespace.partitions().await.unwrap().len(), 1); + } + + #[test] + fn namespace_rejects_partial_or_unknown_selectors() { + let namespace = ContextNamespace::new( + "/tmp/context-ns", + PartitionSpec::tenant_source(), + ContextStoreOptions::default(), + ) + .unwrap(); + + let err = namespace + .resolve_partition(&selector(&[("tenant", "acme")])) + .unwrap_err(); + assert!(err.to_string().contains("source")); + + let err = namespace + .resolve_partition(&selector(&[ + ("tenant", "acme"), + ("source", "memory"), + ("session_id", "s1"), + ])) + .unwrap_err(); + assert!(err.to_string().contains("session_id")); + } +} diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index 71306d6..5629e19 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -34,6 +34,8 @@ pub struct ContextRecord { pub run_id: String, pub bot_id: Option, pub session_id: Option, + pub tenant: Option, + pub source: Option, pub created_at: DateTime, pub role: String, pub state_metadata: Option, @@ -157,6 +159,8 @@ pub struct UpsertResult { pub struct RecordPatch { pub bot_id: Option, pub session_id: Option, + pub tenant: Option, + pub source: Option, pub state_metadata: Option, pub metadata: Option, pub relationships: Option>, @@ -175,6 +179,8 @@ impl RecordPatch { pub fn is_empty(&self) -> bool { self.bot_id.is_none() && self.session_id.is_none() + && self.tenant.is_none() + && self.source.is_none() && self.state_metadata.is_none() && self.metadata.is_none() && self.relationships.is_none() @@ -207,6 +213,8 @@ pub enum MetadataFilter { pub struct RecordFilters { pub bot_id: Option, pub session_id: Option, + pub tenant: Option, + pub source: Option, pub role: Option, pub content_type: Option, pub created_at_start: Option>, @@ -225,6 +233,8 @@ impl RecordFilters { match key.as_str() { "bot_id" => filters.bot_id = filter_string(key.as_str(), value)?, "session_id" => filters.session_id = filter_string(key.as_str(), value)?, + "tenant" => filters.tenant = filter_string(key.as_str(), value)?, + "source" => filters.source = filter_string(key.as_str(), value)?, "role" => filters.role = filter_string(key.as_str(), value)?, "content_type" => filters.content_type = filter_string(key.as_str(), value)?, "created_at" => apply_created_at_filter(&mut filters, value)?, @@ -255,6 +265,8 @@ impl RecordFilters { pub fn is_empty(&self) -> bool { self.bot_id.is_none() && self.session_id.is_none() + && self.tenant.is_none() + && self.source.is_none() && self.role.is_none() && self.content_type.is_none() && self.created_at_start.is_none() @@ -278,6 +290,12 @@ impl RecordFilters { { return false; } + if !matches_typed_or_metadata(record, "tenant", record.tenant.as_deref(), &self.tenant) { + return false; + } + if !matches_typed_or_metadata(record, "source", record.source.as_deref(), &self.source) { + return false; + } if self .role .as_deref() @@ -369,6 +387,24 @@ fn metadata_contains(value: &Value, expected: &Value) -> bool { } } +fn matches_typed_or_metadata( + record: &ContextRecord, + metadata_key: &str, + typed_value: Option<&str>, + expected: &Option, +) -> bool { + let Some(expected) = expected.as_deref() else { + return true; + }; + if typed_value.is_some() { + return typed_value == Some(expected); + } + let Some(Value::Object(metadata)) = &record.metadata else { + return false; + }; + metadata.get(metadata_key) == Some(&Value::String(expected.to_string())) +} + #[cfg(test)] mod tests { use super::*; @@ -382,6 +418,8 @@ mod tests { run_id: "run-1".to_string(), bot_id: Some("support-bot".to_string()), session_id: Some("incident-1".to_string()), + tenant: Some("acme".to_string()), + source: Some("memory".to_string()), created_at: Utc.with_ymd_and_hms(2026, 6, 9, 3, 0, 0).unwrap(), role: "assistant".to_string(), state_metadata: None, @@ -410,6 +448,8 @@ mod tests { let mut filters = RecordFilters::from_json_value(json!({ "bot_id": "support-bot", "session_id": "incident-1", + "tenant": "acme", + "source": "memory", "role": "assistant", "content_type": "text/plain", "created_at": { @@ -426,4 +466,22 @@ mod tests { filters.session_id = Some("other".to_string()); assert!(!filters.matches(&record())); } + + #[test] + fn tenant_and_source_filters_fall_back_to_legacy_metadata() { + let mut record = record(); + record.tenant = None; + record.source = None; + record.metadata = Some(json!({ + "tenant": "acme", + "source": "memory" + })); + + let filters = RecordFilters::from_json_value(json!({ + "tenant": "acme", + "source": "memory" + })) + .unwrap(); + assert!(filters.matches(&record)); + } } diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 693bf98..19556fd 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -595,6 +595,12 @@ impl ContextStore { if let Some(session_id) = patch.session_id { record.session_id = Some(session_id); } + if let Some(tenant) = patch.tenant { + record.tenant = Some(tenant); + } + if let Some(source) = patch.source { + record.source = Some(source); + } if let Some(state_metadata) = patch.state_metadata { record.state_metadata = Some(state_metadata); } @@ -639,6 +645,8 @@ impl ContextStore { run_id: record.run_id, bot_id: record.bot_id, session_id: record.session_id, + tenant: record.tenant, + source: record.source, created_at: Utc::now(), role: record.role, state_metadata: None, @@ -1392,6 +1400,8 @@ impl ContextStore { Field::new("run_id", DataType::Utf8, false), Field::new("bot_id", DataType::Utf8, true), Field::new("session_id", DataType::Utf8, true), + Field::new("tenant", DataType::Utf8, true), + Field::new("source", DataType::Utf8, true), Field::new( "created_at", DataType::Timestamp(TimeUnit::Microsecond, None), @@ -1536,6 +1546,18 @@ impl ContextStore { .field_paths() .iter() .any(|path| path == "metadata"); + let include_tenant = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "tenant"); + let include_source = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "source"); let include_relationships = self.has_relationships_column(); if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) { return Err(ArrowError::InvalidArgumentError( @@ -1550,6 +1572,20 @@ impl ContextStore { ) .into()); } + if !include_tenant && entries.iter().any(|entry| entry.tenant.is_some()) { + return Err(ArrowError::InvalidArgumentError( + "tenant requires a context dataset created with partition-key column support" + .to_string(), + ) + .into()); + } + if !include_source && entries.iter().any(|entry| entry.source.is_some()) { + return Err(ArrowError::InvalidArgumentError( + "source requires a context dataset created with partition-key column support" + .to_string(), + ) + .into()); + } if !include_relationships && entries.iter().any(|entry| !entry.relationships.is_empty()) { return Err(ArrowError::InvalidArgumentError( "relationships require a context dataset with relationships support; run migrate_relationships_column() on older datasets".to_string(), @@ -1569,6 +1605,8 @@ impl ContextStore { let mut run_id_builder = StringBuilder::new(); let mut bot_id_builder = StringBuilder::new(); let mut session_id_builder = StringBuilder::new(); + let mut tenant_builder = StringBuilder::new(); + let mut source_builder = StringBuilder::new(); let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); let mut role_builder = StringDictionaryBuilder::::new(); let mut metadata_builder = LargeStringBuilder::new(); @@ -1621,6 +1659,8 @@ impl ContextStore { run_id_builder.append_value(&entry.run_id); bot_id_builder.append_option(entry.bot_id.as_deref()); session_id_builder.append_option(entry.session_id.as_deref()); + tenant_builder.append_option(entry.tenant.as_deref()); + source_builder.append_option(entry.source.as_deref()); created_at_builder.append_value(entry.created_at.timestamp_micros()); role_builder.append(&entry.role)?; match &entry.metadata { @@ -1744,6 +1784,8 @@ impl ContextStore { let run_id_array: ArrayRef = Arc::new(run_id_builder.finish()); let bot_id_array: ArrayRef = Arc::new(bot_id_builder.finish()); let session_id_array: ArrayRef = Arc::new(session_id_builder.finish()); + let tenant_array: ArrayRef = Arc::new(tenant_builder.finish()); + let source_array: ArrayRef = Arc::new(source_builder.finish()); let created_at_array: ArrayRef = Arc::new(created_at_builder.finish()); let role_array: ArrayRef = Arc::new(role_builder.finish()); let metadata_array: ArrayRef = Arc::new(metadata_builder.finish()); @@ -1777,6 +1819,12 @@ impl ContextStore { ("role".to_string(), role_array), ("state_metadata".to_string(), state_array), ]); + if include_tenant { + arrays_by_name.insert("tenant".to_string(), tenant_array); + } + if include_source { + arrays_by_name.insert("source".to_string(), source_array); + } if include_metadata { arrays_by_name.insert("metadata".to_string(), metadata_array); } @@ -1838,6 +1886,8 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let run_id_array = column_as::(batch, "run_id")?; let bot_id_array = column_as_optional::(batch, "bot_id"); let session_id_array = column_as_optional::(batch, "session_id"); + let tenant_array = column_as_optional::(batch, "tenant"); + let source_array = column_as_optional::(batch, "source"); let created_at_array = column_as::(batch, "created_at")?; let role_array = column_as::>(batch, "role")?; let state_array = column_as::(batch, "state_metadata")?; @@ -2005,6 +2055,22 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { } }); + let tenant = tenant_array.and_then(|arr| { + if arr.is_null(row) { + None + } else { + Some(arr.value(row).to_string()) + } + }); + + let source = source_array.and_then(|arr| { + if arr.is_null(row) { + None + } else { + Some(arr.value(row).to_string()) + } + }); + let metadata = match metadata_array { Some(arr) if !arr.is_null(row) => { Some(serde_json::from_str(arr.value(row)).map_err(|err| { @@ -2042,6 +2108,8 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { run_id: run_id_array.value(row).to_string(), bot_id, session_id, + tenant, + source, created_at, role, state_metadata, @@ -2436,6 +2504,8 @@ mod tests { run_id: format!("run-{id}"), bot_id: None, session_id: None, + tenant: None, + source: None, created_at: Utc::now(), role: "user".to_string(), state_metadata: Some(StateMetadata { diff --git a/crates/lance-context-server/src/routes/records.rs b/crates/lance-context-server/src/routes/records.rs index 8fcc616..e1a3ff6 100644 --- a/crates/lance-context-server/src/routes/records.rs +++ b/crates/lance-context-server/src/routes/records.rs @@ -353,6 +353,8 @@ pub fn record_to_dto(r: ContextRecord) -> RecordDto { run_id: r.run_id, bot_id: r.bot_id, session_id: r.session_id, + tenant: r.tenant, + source: r.source, created_at: r.created_at, role: r.role, content_type: r.content_type, @@ -401,6 +403,8 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { RecordPatch { bot_id: patch.bot_id.clone(), session_id: patch.session_id.clone(), + tenant: patch.tenant.clone(), + source: patch.source.clone(), state_metadata: patch.state_metadata.as_ref().map(|sm| StateMetadata { step: sm.step, active_plan_id: sm.active_plan_id.clone(), @@ -435,6 +439,8 @@ fn record_from_add_request( run_id, bot_id: r.bot_id.clone(), session_id: r.session_id.clone(), + tenant: r.tenant.clone(), + source: r.source.clone(), created_at: Utc::now(), role: r.role.clone(), state_metadata: r.state_metadata.as_ref().map(|sm| StateMetadata { diff --git a/crates/lance-context/src/lib.rs b/crates/lance-context/src/lib.rs index f3007f6..17077d7 100644 --- a/crates/lance-context/src/lib.rs +++ b/crates/lance-context/src/lib.rs @@ -3,10 +3,10 @@ // Explicit re-exports from core (no glob to avoid recursion depth overflow) pub use lance_context_core::serde; pub use lance_context_core::{ - CompactionConfig, CompactionMetrics, CompactionStats, Context, ContextEntry, ContextRecord, - ContextStoreOptions, IdIndexType, LifecycleQueryOptions, MetadataFilter, RecordFilters, - Relationship, RetrieveResult, SearchResult, Snapshot, StateMetadata, LIFECYCLE_ACTIVE, - LIFECYCLE_CONTRADICTED, + CompactionConfig, CompactionMetrics, CompactionStats, Context, ContextEntry, ContextNamespace, + ContextRecord, ContextStoreOptions, IdIndexType, LifecycleQueryOptions, MetadataFilter, + PartitionInfo, PartitionSelector, PartitionSpec, RecordFilters, Relationship, RetrieveResult, + SearchResult, Snapshot, StateMetadata, LIFECYCLE_ACTIVE, LIFECYCLE_CONTRADICTED, }; pub use lance_context_api::{ diff --git a/docs/design/partitioned-namespace.md b/docs/design/partitioned-namespace.md new file mode 100644 index 0000000..3eaac08 --- /dev/null +++ b/docs/design/partitioned-namespace.md @@ -0,0 +1,470 @@ +# Design: Partitioned-Namespace Layout for Scoped Agent Context + +| | | +|---|---| +| **Status** | Draft — for discussion | +| **Issue** | [#90 — Explore partitioned namespace layout for scoped agent context](https://github.com/lance-format/lance-context/issues/90) | +| **Supersedes / refines** | [#37 — Change the management of data tables and add some interface exposure for agent](https://github.com/lance-format/lance-context/issues/37) | +| **References** | [Lance Partitioning Spec](https://lance.org/format/namespace/partitioning-spec/) · [Lance Namespace Spec](https://lance.org/format/namespace/) | +| **Scope** | Conventions + a thin helper API. No new storage engine, no catalog/database. | + +## 1. Summary + +`lance-context` today is a **single fat table**: one Lance dataset per context URI, holding +one `ContextRecord` schema that already carries scope-ish fields (`bot_id`, `session_id`, +`run_id`), a free-form `metadata` JSON blob, embeddings, relationships, and lifecycle +columns. Scoping is done entirely with **metadata filters inside that one dataset**. + +This works well until a deployment needs *physical* isolation between scopes — separate +tenants, agents, environments, or content "directions" (memory vs. knowledge base vs. +skills) that want independent lifecycle, compaction, vector indexes, deletion, or access +control. The natural temptation (and the original direction in #37) is to grow into many +purpose-built tables (`ctx_agent`, `ctx_kb`, `ctx_meta`) plus an `information_schema`-style +registry. Maintainer guidance on #37 pushed back on that: + +> I'm hesitating to make lance-context as a database with an information_schema. +> Maintaining the schema of each table would be a pain in the future. +> Any chance to use the partitioned namespace instead of making it as a database? + +This document takes that suggestion seriously. It proposes that scoped isolation be +expressed as a **Lance Partitioned Namespace** over the **existing single `ContextRecord` +schema**, rather than as a family of bespoke tables with a hand-rolled catalog. A +partitioned namespace is, by definition, *"a collection of tables that share a common +schema … physically separated and independent, but logically related through partition +fields"* — which is exactly the shape we want: **one schema to maintain, many physically +isolated partitions, and an optional unified cross-partition view.** + +The deliverable here is **conventions + a small, optional URI/namespace helper** — not a +catalog, not table families, not an `information_schema`. + +## 2. Background + +### 2.1 Where we are today + +A `Context` is created over a single URI and is backed by exactly one Lance dataset: + +```python +ctx = Context.create("s3://bucket/prefix/context.lance") +ctx.add("user", "…", bot_id="support-bot", session_id="incident-123", + metadata={"tenant": "acme", "scope": "team", "source_uri": "docs://…"}) +ctx.list(filters={"bot_id": "support-bot", "session_id": "incident-123", + "scope": "team"}) +``` + +Scope is carried two ways: + +- **Typed columns** that already exist on `ContextRecord`: `bot_id`, `session_id`, + `run_id`, `role`, `content_type`, plus lifecycle columns. +- **Free-form `metadata` JSON** (e.g. `tenant`, `scope`, `tags`, `source_uri`), filtered + by `list`/`search`/`retrieve`. + +Everything lives in one dataset. That is simple and correct, and **must stay the default.** + +### 2.2 What #37 was really asking for + +The #37 thread had two distinct ideas tangled together: + +1. **Layering (L0/L1/L2)** — summary / structured+vector / raw. This is a *per-record* + property, orthogonal to isolation. +2. **"Different directions"** — memory, knowledge base, skills, tool calls — which the + author modeled as separate table families (`ctx_agent` / `ctx_kb` / `ctx_meta`) and an + `information_schema` (`ctx_meta`) to register them. + +The maintainer's objection is specifically to #2: a growing set of tables, each with its +own schema, plus a registry we have to maintain, turns `lance-context` into a small +database. The cost is schema sprawl and migration pain. + +### 2.3 Why a partitioned namespace fits + +The Lance Partitioning Spec already solves "many physically separate tables that share one +schema and one lifecycle, queryable together when needed": + +- All partitions **share a single namespace schema**; the spec *requires* that "all + partition table schemas must be consistent with each other, as well as with the + namespace schema." → **one schema to maintain**, which is the whole point of #37's + pushback. +- The registry is the spec's own **`__manifest` table**, maintained by the namespace + format — not a bespoke `information_schema` we own. +- Each partition is **a standard, independently accessible Lance `Dataset`** — so a single + partition is exactly today's `Context`, unchanged. +- Cross-partition queries are supported via **partition pruning** over `__manifest`. + +So we get physical isolation *and* a unified view, without becoming a database. + +## 3. Goals / Non-goals + +Mirrors #90. + +**Goals** + +- Recommend upstream-friendly **partition naming / spec conventions** for scoped context + (tenant, agent, session, source, environment). +- Clarify **which scope dimensions belong in path/namespace layout vs. record metadata.** +- Sketch a **small helper API** to resolve/open a partition without a catalog. +- Show **when to use one context with metadata filters vs. separate partitions.** +- Give **migration guidance**: start with one dataset, split by scope later. +- Stay compatible with the single fat-table model and the #37 guidance. + +**Non-goals** + +- No `information_schema`, table-family management, SQL catalog, or database abstraction. +- `lance-context` does **not** own tenant authN/Z or policy enforcement. +- Not a replacement for metadata filters *inside* a single context dataset. +- No new storage engine; no change to the `ContextRecord` payload model. + +## 4. Lance partitioned namespace, in brief + +(Condensed from the [Partitioning Spec](https://lance.org/format/namespace/partitioning-spec/); +read it for the authoritative version.) + +A partitioned namespace is a directory catalog whose `__manifest` table metadata declares +one or more **partition specs**. Logical layout: + +```text +Root Namespace (__manifest Lance table) + metadata: + schema = + partition_spec_v1 = [tenant, agent] + │ + ▼ + Spec Version Level ── v1, v2, … (one per partition-spec version) + │ + ▼ + Partition Namespaces ── one level per partition field, in spec order + │ (names are random 16-char base36 ids) + ▼ + Partition Table ── fixed name "dataset" = a normal Lance Dataset +``` + +Key properties we rely on: + +- **Shared schema.** Field IDs (`lance:field_id`) are stable; partition specs reference + source columns by id, so columns can be renamed without breaking partitioning. +- **Partition spec** = `{ id, fields: [{ field_id, source_ids, transform|expression, + result_type }] }`. Built-in transforms: `identity`, `year`/`month`/`day`/`hour`, + `bucket(N)`, `multi_bucket(N)`, `truncate(W)`; or a custom DataFusion `expression`. +- **Random-id partition namespaces.** Partition *values* are **not** put in the path + (avoids `/`, `=`, `$` issues in tenant/source ids); they live in `__manifest` columns + named `partition_field_{field_id}`. +- **`__manifest` is the registry.** Query engines push predicates onto its partition + columns to prune to the matching `dataset` tables (no path parsing). +- **Partition evolution** via new spec versions (`v2`, …). Old data under `v1` stays + queryable; no rewrite required. +- **Transactions.** A single partition is fully ACID (it is a Lance table). Cross-partition + writes/reads are *not* atomic by default, but the spec defines an optional + `read_version`/`read_branch`/`read_tag` commit protocol on `__manifest` for all-or-nothing + multi-partition visibility. + +## 5. Design + +### 5.1 Principle: one schema, many partitions + +Keep **one `ContextRecord` schema** as the namespace schema. Isolation is achieved by +*splitting rows across partitions*, never by giving each direction its own schema. This is +the single decision that keeps us out of "database with an information_schema" territory: + +- "memory", "knowledge base", "skills", "tool calls" are **values of a partition field** + (or scalar columns), **not** separate tables with separate schemas. +- The thing #37 called `ctx_meta` / `information_schema` is replaced by the spec's + `__manifest` table, which we do not design or maintain. + +### 5.2 Which dimensions go in the layout vs. the record + +This is the core guidance #90 asks for. + +> Put a dimension in the **partition layout** when you routinely scope an *entire* operation +> to one value of it, and you want that value's data physically isolated. Keep a dimension +> in **record fields/metadata** when it is high-cardinality, cross-cutting, or combined +> ad-hoc with other filters. + +| Dimension | Typical cardinality | Recommended placement | Rationale | +|---|---|---|---| +| `tenant` / org | low–med | **Partition** (`identity`) | Hard isolation boundary; per-tenant delete/retention/access; queries almost always tenant-scoped. | +| `environment` (prod/stage) | tiny | **Partition** (`identity`) | Never want stage data bleeding into prod recall. | +| `agent` / bot (`bot_id`) | low–med | **Partition** (`identity`) *if* agents need independent indexes/lifecycle; else **column** | `bot_id` is already a column. Partition only when isolation matters. | +| `source` / `kind` (memory \| knowledge \| skill \| tool_call) | tiny (enum) | **Partition** (`identity`) *or* scalar column | The #37 "directions". Partition when lifecycles diverge (e.g. KB is bulk-rebuilt, memory is append/prune). | +| `user` | high | **Column** + metadata; **`bucket(N)`** if isolation needed | Don't create millions of partitions; hash into N buckets if you must partition. | +| `session_id` | very high | **Column** (`session_id`) | Already a column; fine-grained, cross-cut by other filters. | +| `run_id`, `role`, `content_type`, `tags`, `confidence`, `source_uri` | high / cross-cutting | **Column / `metadata`** | Ad-hoc filter material; never a directory boundary. | +| time | continuous | **Column** (`created_at`); `day`/`month` transform only for cold archival | Recall filters by range, not by exact partition; reserve time partitions for archive tiers. | + +Rules of thumb: + +1. **Few coarse boundaries → partition.** Many fine values → filter. +2. **Physical needs → partition.** If you need an independent vector index, independent + compaction cadence, drop-to-delete, or per-scope ACLs at the storage layer, it must be a + partition. Pure logical filtering can stay a column. +3. **Cap the partition count.** Keep the total number of `dataset` tables in the low + thousands. For a dimension you need to isolate but whose cardinality is high, use + `bucket(field, N)` instead of `identity`. +4. **A dimension you partition on must be a typed column** in the shared schema (see + §5.5) — partition specs reference columns by field id, so they cannot read keys buried + in the `metadata` JSON blob. + +### 5.3 Recommended partition specs + +Three conventions, smallest-useful first. Field ids are stable strings. + +**A. Tenant-isolated (most common).** + +```json +{ + "id": 1, + "fields": [ + { "field_id": "tenant", "source_ids": [], + "transform": { "type": "identity" }, "result_type": { "type": "utf8" } } + ] +} +``` + +**B. Tenant → agent → source (multi-direction, answers #37).** + +```json +{ + "id": 1, + "fields": [ + { "field_id": "tenant", "source_ids": [], "transform": {"type": "identity"}, "result_type": {"type": "utf8"} }, + { "field_id": "agent", "source_ids": [], "transform": {"type": "identity"}, "result_type": {"type": "utf8"} }, + { "field_id": "source", "source_ids": [], "transform": {"type": "identity"}, "result_type": {"type": "utf8"} } + ] +} +``` + +`source ∈ {memory, knowledge, skill, tool_call}` — the #37 "directions" become partition +values, not tables. Order matters: outer levels are the coarsest, most-often-scoped +dimensions (the spec nests partition namespaces in field order). + +**C. High-cardinality user isolation via bucketing.** + +```json +{ + "id": 1, + "fields": [ + { "field_id": "tenant", "source_ids": [], "transform": {"type": "identity"}, "result_type": {"type": "utf8"} }, + { "field_id": "user_bucket", "source_ids": [], "transform": {"type": "bucket", "num_buckets": 64}, "result_type": {"type": "int32"} } + ] +} +``` + +Caps per-tenant partitions at 64 while still giving physical separation and enabling +storage-partitioned joins against other namespaces bucketed compatibly. + +### 5.4 Path / URI conventions (caller-facing) + +Per the spec, **physical** partition directories are random-id names like +`b4a3c2d1_v1$k7m2n9p4q8r5s3t6$dataset/`, and partition values live in `__manifest`. Callers +should never construct these by hand. What we standardize is the **caller-facing** address: + +- A **namespace root URI** — the directory containing `__manifest`, e.g. + `s3://bucket/acme-contexts/`. This replaces "one `.lance` dataset" as the top-level + handle when partitioning is in use. +- A **partition selector** — a dict of `{field_id: value}` resolved against the spec, e.g. + `{"tenant": "acme", "agent": "support-bot", "source": "memory"}`. + +```text +s3://bucket/acme-contexts/ # namespace root (holds __manifest) + └── (resolved by spec+manifest) ──► one Lance "dataset" table == a normal Context +``` + +Because partition values are stored in `__manifest`, **tenant/source ids may contain any +characters** (`/`, `:`, spaces) without escaping — a real advantage over encoding scope +into the path ourselves. + +### 5.5 Schema implication: promote partition keys to columns + +To partition by a dimension, it must be a first-class column referenced by `source_ids`. +Status of the candidate keys on `ContextRecord` today: + +- **Already columns** (usable now): `bot_id` (→ agent), `session_id`, `run_id`, `role`, + `content_type`, `created_at`. +- **Currently in `metadata` JSON** (would need promotion to typed columns to partition on): + `tenant`, `source`/`kind`, `environment`, `user_id`. + +Recommendation: when (and only when) we implement partitioning, add nullable typed columns +for the dimensions a deployment wants to partition by (start with `tenant` and `source`). +This is additive and backward compatible — existing single datasets simply leave them null. +Keeping these as real columns also makes them filterable in `list`/`search` regardless of +whether they are partitioned, which is strictly better than the JSON blob. + +### 5.6 Querying: single-partition fast path, cross-partition fan-out + +- **Single partition (the 95% path).** Resolve the selector to one `dataset` and open it as + today's `Context`. Full ACID, full speed, identical semantics to the current code. Vector + search, hybrid `retrieve`, `list`, lifecycle — all unchanged, just scoped. +- **Cross-partition (the unified view).** Two sub-cases: + - *Pruned scan*: a partial selector (e.g. `{"tenant": "acme"}`, all sources) → query + `__manifest` for matching `dataset` paths, then fan out and merge. For vector search, + run top-k per partition and merge by distance; for `retrieve`, fuse per-partition RRF + results. This is partition pruning from the spec applied to recall. + - *Full scan*: no selector → every partition. Discouraged for vector search at scale; + intended for analytics/export. + +Cross-partition reads are **not** isolated by default (each partition may be at a different +version). When a consistent multi-partition snapshot is required, use the spec's +`read_version`/`read_tag` columns on `__manifest`. We treat strongly-consistent +multi-partition transactions as a later, opt-in phase (§8). + +### 5.7 Helper API sketch (thin resolver, not a catalog) + +A small layer that (1) reads/writes `__manifest`, (2) resolves a selector to a `dataset` +URI, and (3) returns a normal `Context`. It deliberately exposes **no** create-table / +alter-schema / list-schemas surface — that would be the catalog we are avoiding. + +```python +from lance_context.namespace import ContextNamespace, PartitionSpec + +# Define once, at namespace creation. The schema is the single ContextRecord schema. +ns = ContextNamespace.create( + "s3://bucket/acme-contexts/", + partition_spec=PartitionSpec(fields=["tenant", "agent", "source"]), # spec B + storage_options={...}, +) + +# Open exactly one partition -> a normal Context (today's object, unchanged). +ctx = ns.context(tenant="acme", agent="support-bot", source="memory") +ctx.add("assistant", "runbook owner is the platform team", embedding=vec) + +# Partial selector -> cross-partition, pruned via __manifest. +hits = ns.search(vec, limit=10, where={"tenant": "acme"}) # all agents+sources +hits = ns.retrieve(text="runbook owner", vector=vec, limit=5, + where={"tenant": "acme", "source": "knowledge"}) + +# Introspection is read-only and comes straight from __manifest (no info_schema we own). +for part in ns.partitions(where={"tenant": "acme"}): + print(part.values, part.uri, part.version) +``` + +Rust mirror (sketch): + +```rust +let ns = ContextNamespace::create(root_uri, PartitionSpec::new(&["tenant", "agent", "source"])).await?; +let mut ctx = ns.context(&[("tenant","acme"), ("agent","support-bot"), ("source","memory")]).await?; +ctx.add(&records).await?; + +let hits = ns.search(&query, 10, &[("tenant","acme")]).await?; // fan-out + merge +``` + +`ContextNamespace` is optional sugar; opening a single partition directly by its resolved +dataset URI must keep working so nothing depends on the helper. + +## 6. When to partition vs. filter inside one context + +A decision aid for users (the doc #90 asks for): + +| Situation | Recommendation | +|---|---| +| One app, one team, < ~10M records, scopes only differ logically | **One `Context`**, use `metadata`/column filters. Don't partition. | +| Multiple tenants needing data isolation, independent deletion/retention, or per-tenant ACLs | **Partition by `tenant`.** | +| KB is bulk re-imported on a schedule while chat memory is append+prune | **Partition by `source`** so you can rebuild/compact/index each independently. | +| Millions of users, each needing isolation | **`bucket(user_id, N)`**, not a partition per user. | +| You filter by `session_id`, `tags`, `role`, time range, `confidence` | **Keep as columns/metadata**; never a partition. | +| You need vector indexes tuned differently per scope | **Partition** (each `dataset` has its own index). | +| You occasionally need a cross-scope view | Fine — that's the unified namespace; use a partial/empty selector. | + +Default remains: **start with one context and metadata filters.** Reach for partitions only +when a *physical* property (isolation, lifecycle, index, deletion, ACL) demands it. + +## 7. Migration & evolution + +### 7.1 Single dataset → partitioned namespace + +Existing single-dataset `Context`s keep working with no change; the namespace layer is +purely additive. To adopt partitioning: + +1. **New deployments**: create a `ContextNamespace` with `partition_spec_v1`; write through + it from day one. +2. **Existing single dataset**: stand up a namespace root, register the current dataset as + the initial partition (e.g. the `default`/legacy scope) in `__manifest`, and route new + scoped writes to new partitions. No bulk rewrite of existing rows is required to *start*; + backfilling the new partition-key columns can be incremental (rows without the key sort + into a null/`default` partition). + +### 7.2 Partition spec evolution + +Use spec versions exactly as the spec intends: + +- Start at `partition_spec_v1 = [tenant]`. +- Later add agent/source: introduce `partition_spec_v2 = [tenant, agent, source]`. New + writes land under `v2/`; `v1/` data stays queryable; cross-version queries reconcile via + each version's spec. **No migration of old partitions needed.** + +Because partition fields are referenced by stable field id, renaming the underlying column +later does not break existing specs. + +## 8. Interaction with existing features + +- **Versioning / time-travel**: each partition is a normal Lance dataset, so `checkout`, + `snapshot`, `fork` work per partition unchanged. Namespace-wide consistent snapshots are + the opt-in `read_version`/`read_tag` mechanism (Phase 3). +- **Compaction**: per partition (a key benefit — compact hot memory partitions without + touching a large static KB). +- **Vector / FTS indexes**: per partition; tune independently. Cross-partition search merges + per-partition results. +- **Delete / `forget`**: still a versioned logical tombstone within a partition; dropping an + entire partition becomes a clean physical delete path for "forget this whole tenant". +- **`storage_options`**: unchanged; applies to the namespace root and inherited by + partitions. +- **Relationships / GraphRAG**: edges are within-record and unaffected. Note: relationship + *traversal* is naturally partition-local; cross-partition edges resolve through the + fan-out path. Call this out as a known limitation, not a feature, for v1. + +## 9. Alternatives considered + +1. **Table families + `information_schema` (original #37).** Rejected per maintainer + guidance: per-table schemas and a hand-maintained registry are ongoing maintenance and + migration cost — i.e. building a database. +2. **Metadata filters only (status quo, forever).** Insufficient when a scope needs physical + isolation: no independent deletion/retention, no per-scope index tuning, no storage-level + ACL, and one giant dataset to compact. Great default, wrong ceiling. +3. **Many independent `.lance` datasets with an app-side map.** Loses the shared-schema + guarantee, the `__manifest` registry, partition pruning, and cross-scope query — every + consumer reinvents discovery. The partitioned namespace *is* the standardized version of + this, so we should use it rather than re-implement it badly. +4. **Partition by every dimension (incl. session/user).** Rejected: directory/partition + explosion. Hence the §5.2 cardinality rules and `bucket(N)`. + +## 10. Open questions + +- **Default `source`/`kind` vocabulary**: do we bless an enum (`memory`/`knowledge`/`skill`/ + `tool_call`) or leave it free-form? Leaning: document the convention, don't enforce. +- **Cross-partition vector search ranking**: top-k-per-partition + merge is simple but reads + every selected partition. Is a coarse L0-style summary partition (ties back to #37's L0) + worth it as a pre-filter? Probably a later optimization. +- **Manifest dependency**: do we depend on `lance-namespace` clients, or implement the + minimal `__manifest` read/resolve ourselves to keep the dependency surface small? Affects + Phase 2 scope. +- **Helper ergonomics**: is `ContextNamespace` the right primitive, or should `Context.open` + simply accept a `partition={…}` kwarg over a namespace root? + +## 11. Phased rollout + +Incremental, docs-first, each phase independently shippable (matches #90's acceptance +criteria): + +- **Phase 0 (this doc).** Conventions agreed: dimension placement (§5.2), partition specs + (§5.3), URI conventions (§5.4), when-to-partition guidance (§6). +- **Phase 1 — read/resolve helper.** Promote `tenant`/`source` to typed columns; a thin + `ContextNamespace` that creates `__manifest`, resolves a full selector to one `dataset`, + and returns today's `Context`. Single-partition only. No semantics change for existing + users. +- **Phase 2 — cross-partition reads.** Partition pruning over `__manifest`; fan-out + + merge for `search`/`retrieve`/`list`; read-only `partitions()` introspection. +- **Phase 3 — multi-partition consistency (opt-in).** `read_version`/`read_tag` commit + protocol for consistent cross-partition snapshots; whole-partition drop as physical + `forget`. + +## 12. Acceptance criteria mapping (#90) + +| #90 criterion | Where addressed | +|---|---| +| Propose upstream-friendly partition naming conventions | §5.3, §5.4 | +| Clarify path/namespace layout vs. record metadata | §5.2 (decision table + rules) | +| Follow-up implementation can be incremental (docs first / small helper) | §11 | +| Compatible with single-table model and #37 guidance | §5.1, §5.5, §8, §9 | + +## 13. References + +- Lance Partitioning Spec — +- Lance Namespace Spec — +- Issue #37 — table management & agent interfaces (and maintainer guidance) +- Issue #90 — explore partitioned namespace layout for scoped agent context diff --git a/python/python/lance_context/__init__.py b/python/python/lance_context/__init__.py index b6a356c..6f22999 100644 --- a/python/python/lance_context/__init__.py +++ b/python/python/lance_context/__init__.py @@ -3,8 +3,15 @@ from .api import ( # pyright: ignore[reportMissingImports] AsyncContext, Context, + ContextNamespace, EmbeddingProvider, __version__, ) -__all__ = ["AsyncContext", "Context", "EmbeddingProvider", "__version__"] +__all__ = [ + "AsyncContext", + "Context", + "ContextNamespace", + "EmbeddingProvider", + "__version__", +] diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 24a642c..4e1d66f 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -9,10 +9,19 @@ from typing import Any from ._internal import Context as _Context # pyright: ignore[reportMissingImports] +from ._internal import ( # pyright: ignore[reportMissingImports] + ContextNamespace as _ContextNamespace, +) from ._internal import version as _version # pyright: ignore[reportMissingImports] from .embeddings import EmbeddingProvider, _build_provider -__all__ = ["AsyncContext", "Context", "EmbeddingProvider", "__version__"] +__all__ = [ + "AsyncContext", + "Context", + "ContextNamespace", + "EmbeddingProvider", + "__version__", +] __version__ = _version() @@ -138,6 +147,8 @@ def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: "run_id": raw.get("run_id"), "bot_id": raw.get("bot_id"), "session_id": raw.get("session_id"), + "tenant": raw.get("tenant"), + "source": raw.get("source"), "role": raw.get("role"), "content_type": raw.get("content_type"), "text": raw.get("text_payload"), @@ -374,6 +385,7 @@ def __init__( ) else: self._embedding_provider = embedding_provider + self._default_fields: dict[str, str] = {} @classmethod def create( @@ -438,6 +450,8 @@ def add( embedding: list[float] | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, external_id: str | None = None, state_metadata: Mapping[str, Any] | None = None, metadata: dict[str, Any] | None = None, @@ -458,6 +472,15 @@ def add( provider = getattr(self, "_embedding_provider", None) if embedding is None and provider is not None and isinstance(payload, str): embedding = provider.embed_texts([payload])[0] + defaults = getattr(self, "_default_fields", {}) + if bot_id is None: + bot_id = defaults.get("bot_id") + if session_id is None: + session_id = defaults.get("session_id") + if tenant is None: + tenant = defaults.get("tenant") + if source is None: + source = defaults.get("source") self._inner.add( role, payload, @@ -465,6 +488,8 @@ def add( embedding, bot_id, session_id, + tenant, + source, external_id, _normalize_state_metadata(state_metadata), _json_dumps(metadata, "metadata"), @@ -487,6 +512,8 @@ def upsert( embedding: list[float] | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, relationships: list[dict[str, Any]] | None = None, @@ -512,6 +539,15 @@ def upsert( provider = getattr(self, "_embedding_provider", None) if embedding is None and provider is not None and isinstance(payload, str): embedding = provider.embed_texts([payload])[0] + defaults = getattr(self, "_default_fields", {}) + if bot_id is None: + bot_id = defaults.get("bot_id") + if session_id is None: + session_id = defaults.get("session_id") + if tenant is None: + tenant = defaults.get("tenant") + if source is None: + source = defaults.get("source") result = self._inner.upsert( role, payload, @@ -519,6 +555,8 @@ def upsert( embedding, bot_id, session_id, + tenant, + source, external_id, _json_dumps(metadata, "metadata"), _coerce_timestamp(expires_at, field_name="expires_at"), @@ -543,6 +581,8 @@ def update( external_id: str | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, metadata: dict[str, Any] | None = None, relationships: list[dict[str, Any]] | None = None, expires_at: datetime | str | None = None, @@ -563,6 +603,8 @@ def update( if ( bot_id is None and session_id is None + and tenant is None + and source is None and metadata is None and relationships is None and expires_at is None @@ -579,6 +621,8 @@ def update( external_id, bot_id, session_id, + tenant, + source, _json_dumps(metadata, "metadata"), _json_dumps(relationships, "relationships"), _coerce_timestamp(expires_at, field_name="expires_at"), @@ -601,7 +645,8 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: Each record accepts the same fields as :meth:`add`: ``role``, ``content``, optional ``content_type``/``data_type``, ``embedding``, - ``bot_id``, ``session_id``, ``external_id``, ``state_metadata``, + ``bot_id``, ``session_id``, ``tenant``, ``source``, ``external_id``, + ``state_metadata``, ``metadata``, ``relationships``, and lifecycle fields such as ``expires_at`` and ``lifecycle_status``. """ @@ -630,8 +675,19 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: "content": payload, "data_type": resolved_type, "embedding": record.get("embedding"), - "bot_id": record.get("bot_id"), - "session_id": record.get("session_id"), + "bot_id": record.get( + "bot_id", getattr(self, "_default_fields", {}).get("bot_id") + ), + "session_id": record.get( + "session_id", + getattr(self, "_default_fields", {}).get("session_id"), + ), + "tenant": record.get( + "tenant", getattr(self, "_default_fields", {}).get("tenant") + ), + "source": record.get( + "source", getattr(self, "_default_fields", {}).get("source") + ), "external_id": record.get("external_id"), "state_metadata": _normalize_state_metadata( record.get("state_metadata") @@ -681,7 +737,7 @@ def snapshot(self, label: str | None = None) -> str: def fork(self, branch_name: str) -> Context: inner = self._inner.fork(branch_name) - return self._from_inner(inner, self._embedding_provider) + return self._from_inner(inner, self._embedding_provider, self._default_fields) def checkout(self, version_id: int | str) -> None: self._inner.checkout(int(version_id)) @@ -755,8 +811,8 @@ def list( limit: Maximum number of entries to return. If None, returns all. offset: Number of entries to skip before returning results. filters: Optional equality filters for built-in fields - (bot_id, session_id, role, content_type), created_at range - filters, or metadata fields. + (bot_id, session_id, tenant, source, role, content_type), + created_at range filters, or metadata fields. include_expired: Include records whose ``expires_at`` is in the past. include_retired: Include retired/superseded/revoked records. @@ -883,14 +939,114 @@ def __repr__(self) -> str: @classmethod def _from_inner( - cls, inner: _Context, embedding_provider: EmbeddingProvider | None = None + cls, + inner: _Context, + embedding_provider: EmbeddingProvider | None = None, + default_fields: Mapping[str, str] | None = None, ) -> Context: obj = cls.__new__(cls) obj._inner = inner obj._embedding_provider = embedding_provider + obj._default_fields = dict(default_fields or {}) return obj +class ContextNamespace: + """Resolver for physically isolated context partitions under one namespace root.""" + + def __init__( + self, + root_uri: str, + fields: Iterable[str], + *, + storage_options: dict[str, Any] | None = None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + aws_session_token: str | None = None, + region: str | None = None, + endpoint_url: str | None = None, + allow_http: bool = False, + enable_background_compaction: bool = False, + compaction_interval_secs: int = 300, + compaction_min_fragments: int = 5, + compaction_target_rows: int = 1_000_000, + quiet_hours: list[tuple[int, int]] | None = None, + id_index_type: str | None = None, + embedding_dim: int | None = None, + distance_metric: str | None = None, + embedding_provider: EmbeddingProvider | dict[str, Any] | None = None, + ) -> None: + options = _merge_storage_options( + storage_options, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + region=region, + endpoint_url=endpoint_url, + allow_http=allow_http, + ) + + compaction_config = { + "enabled": enable_background_compaction, + "check_interval_secs": compaction_interval_secs, + "min_fragments": compaction_min_fragments, + "target_rows_per_fragment": compaction_target_rows, + "quiet_hours": quiet_hours or [], + } + + field_list = list(fields) + if ( + options + or compaction_config["enabled"] + or id_index_type + or embedding_dim is not None + or distance_metric + ): + self._inner = _ContextNamespace.create( + root_uri, + field_list, + storage_options=options or None, + compaction_config=compaction_config, + id_index_type=id_index_type, + embedding_dim=embedding_dim, + distance_metric=distance_metric, + ) + else: + self._inner = _ContextNamespace.create(root_uri, field_list) + + if isinstance(embedding_provider, dict): + self._embedding_provider: EmbeddingProvider | None = _build_provider( + embedding_provider + ) + else: + self._embedding_provider = embedding_provider + + @classmethod + def create( + cls, + root_uri: str, + fields: Iterable[str], + **kwargs: Any, + ) -> ContextNamespace: + return cls(root_uri, fields, **kwargs) + + def root_uri(self) -> str: + return self._inner.root_uri() + + def manifest_uri(self) -> str: + return self._inner.manifest_uri() + + def partition_uri(self, **selector: str) -> str: + return self._inner.partition_uri(selector) + + def context(self, **selector: str) -> Context: + inner = self._inner.context(selector) + return Context._from_inner(inner, self._embedding_provider, selector) + + def partitions(self) -> list[dict[str, Any]]: + return list(self._inner.partitions()) + + class AsyncContext: """Async wrapper around :class:`Context`. @@ -942,6 +1098,8 @@ async def add( embedding: list[float] | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, external_id: str | None = None, state_metadata: Mapping[str, Any] | None = None, metadata: dict[str, Any] | None = None, @@ -965,6 +1123,8 @@ async def add( embedding=embedding, bot_id=bot_id, session_id=session_id, + tenant=tenant, + source=source, external_id=external_id, state_metadata=state_metadata, metadata=metadata, @@ -988,6 +1148,8 @@ async def upsert( embedding: list[float] | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, relationships: list[dict[str, Any]] | None = None, @@ -1010,6 +1172,8 @@ async def upsert( embedding=embedding, bot_id=bot_id, session_id=session_id, + tenant=tenant, + source=source, external_id=external_id, metadata=metadata, relationships=relationships, @@ -1029,6 +1193,8 @@ async def update( external_id: str | None = None, bot_id: str | None = None, session_id: str | None = None, + tenant: str | None = None, + source: str | None = None, metadata: dict[str, Any] | None = None, relationships: list[dict[str, Any]] | None = None, expires_at: datetime | str | None = None, @@ -1045,6 +1211,8 @@ async def update( external_id=external_id, bot_id=bot_id, session_id=session_id, + tenant=tenant, + source=source, metadata=metadata, relationships=relationships, expires_at=expires_at, diff --git a/python/src/lib.rs b/python/src/lib.rs index d142499..a930f44 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,6 +1,6 @@ #![recursion_limit = "256"] -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use chrono::{DateTime, SecondsFormat, Utc}; @@ -13,10 +13,11 @@ use tokio::runtime::Runtime; use lance_context_core::serde::CONTENT_TYPE_TEXT; use lance_context_core::{ - CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord, - ContextStore, ContextStoreOptions, DistanceMetric, IdIndexType, LifecycleQueryOptions, - RecordFilters, RecordPatch, Relationship, RetrieveResult, SearchResult, StateMetadata, - LIFECYCLE_ACTIVE, + CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, + ContextNamespace as RustContextNamespace, ContextRecord, ContextStore, ContextStoreOptions, + DistanceMetric, IdIndexType, LifecycleQueryOptions, PartitionInfo, PartitionSelector, + PartitionSpec, RecordFilters, RecordPatch, Relationship, RetrieveResult, SearchResult, + StateMetadata, LIFECYCLE_ACTIVE, }; const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream"; @@ -35,6 +36,8 @@ struct RecordInput { embedding: Option>, bot_id: Option, session_id: Option, + tenant: Option, + source: Option, external_id: Option, state_metadata: Option, metadata_json: Option, @@ -66,6 +69,12 @@ struct Context { run_id: String, } +#[pyclass] +struct ContextNamespace { + inner: RustContextNamespace, + runtime: Arc, +} + fn storage_options_from_dict<'py>( dict: Option<&Bound<'py, PyDict>>, ) -> PyResult>> { @@ -143,6 +152,43 @@ fn compaction_config_from_dict<'py>( Ok(config) } +fn context_options_from_py<'py>( + storage_options: Option<&Bound<'py, PyDict>>, + compaction_config: Option<&Bound<'py, PyDict>>, + blob_columns: Option>, + id_index_type: Option, + embedding_dim: Option, + distance_metric: Option, +) -> PyResult { + let blob_set: HashSet = blob_columns.unwrap_or_default().into_iter().collect(); + + let id_idx = match id_index_type.as_deref() { + Some("btree") => IdIndexType::BTree, + Some("zonemap") => IdIndexType::ZoneMap, + Some("none") | None => IdIndexType::None, + Some(other) => { + return Err(PyRuntimeError::new_err(format!( + "invalid id_index_type '{}': valid values are 'btree', 'zonemap'", + other + ))) + } + }; + + let metric = match distance_metric.as_deref() { + Some(value) => Some(DistanceMetric::parse(value).map_err(to_py_err)?), + None => None, + }; + + Ok(ContextStoreOptions { + storage_options: storage_options_from_dict(storage_options)?, + compaction: compaction_config_from_dict(compaction_config)?, + embedding_dim, + blob_columns: blob_set, + id_index_type: id_idx, + distance_metric: metric, + }) +} + fn metadata_from_json(metadata_json: Option) -> PyResult> { metadata_json .map(|value| serde_json::from_str(&value).map_err(to_py_err)) @@ -166,6 +212,17 @@ fn filters_from_json(filters_json: Option) -> PyResult) -> PyResult { + let mut selector = BTreeMap::new(); + for (key, value) in dict.iter() { + if value.is_none() { + continue; + } + selector.insert(key.extract::()?, value.extract::()?); + } + Ok(selector) +} + #[pymethods] impl Context { #[classmethod] @@ -183,34 +240,14 @@ impl Context { distance_metric: Option, ) -> PyResult { let runtime = Arc::new(Runtime::new().map_err(to_py_err)?); - - let blob_set: HashSet = blob_columns.unwrap_or_default().into_iter().collect(); - - let id_idx = match id_index_type.as_deref() { - Some("btree") => IdIndexType::BTree, - Some("zonemap") => IdIndexType::ZoneMap, - Some("none") | None => IdIndexType::None, - Some(other) => { - return Err(PyRuntimeError::new_err(format!( - "invalid id_index_type '{}': valid values are 'btree', 'zonemap'", - other - ))) - } - }; - - let metric = match distance_metric.as_deref() { - Some(value) => Some(DistanceMetric::parse(value).map_err(to_py_err)?), - None => None, - }; - - let options = ContextStoreOptions { - storage_options: storage_options_from_dict(storage_options)?, - compaction: compaction_config_from_dict(compaction_config)?, + let options = context_options_from_py( + storage_options, + compaction_config, + blob_columns, + id_index_type, embedding_dim, - blob_columns: blob_set, - id_index_type: id_idx, - distance_metric: metric, - }; + distance_metric, + )?; let store_res = py.allow_threads(|| runtime.block_on(ContextStore::open_with_options(uri, options))); @@ -241,7 +278,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None, relationships_json = None))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None, relationships_json = None))] fn add( &mut self, py: Python<'_>, @@ -251,6 +288,8 @@ impl Context { embedding: Option>, bot_id: Option, session_id: Option, + tenant: Option, + source: Option, external_id: Option, state_metadata: Option<&Bound<'_, PyDict>>, metadata_json: Option, @@ -280,6 +319,8 @@ impl Context { embedding, bot_id, session_id, + tenant, + source, external_id, state_metadata: state_metadata_from_dict(state_metadata)?, metadata_json, @@ -303,7 +344,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, relationships_json = None, key = "external_id"))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, relationships_json = None, key = "external_id"))] fn upsert( &mut self, py: Python<'_>, @@ -313,6 +354,8 @@ impl Context { embedding: Option>, bot_id: Option, session_id: Option, + tenant: Option, + source: Option, external_id: Option, metadata_json: Option, expires_at: Option, @@ -351,6 +394,8 @@ impl Context { embedding, bot_id, session_id, + tenant, + source, external_id, state_metadata: None, metadata_json, @@ -380,7 +425,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] + #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, tenant = None, source = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] fn update( &mut self, py: Python<'_>, @@ -388,6 +433,8 @@ impl Context { external_id: Option, bot_id: Option, session_id: Option, + tenant: Option, + source: Option, metadata_json: Option, relationships_json: Option, expires_at: Option, @@ -400,6 +447,8 @@ impl Context { let patch = RecordPatch { bot_id, session_id, + tenant, + source, state_metadata: None, metadata: metadata_from_json(metadata_json)?, relationships: relationships_patch_from_json(relationships_json)?, @@ -735,6 +784,83 @@ impl Context { } } +#[pymethods] +impl ContextNamespace { + #[classmethod] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (root_uri, fields, *, storage_options=None, compaction_config=None, blob_columns=None, id_index_type=None, embedding_dim=None, distance_metric=None))] + fn create( + _cls: &Bound<'_, PyType>, + py: Python<'_>, + root_uri: &str, + fields: Vec, + storage_options: Option<&Bound<'_, PyDict>>, + compaction_config: Option<&Bound<'_, PyDict>>, + blob_columns: Option>, + id_index_type: Option, + embedding_dim: Option, + distance_metric: Option, + ) -> PyResult { + let runtime = Arc::new(Runtime::new().map_err(to_py_err)?); + let spec = PartitionSpec::new(fields).map_err(to_py_err)?; + let options = context_options_from_py( + storage_options, + compaction_config, + blob_columns, + id_index_type, + embedding_dim, + distance_metric, + )?; + let inner = py.allow_threads(|| { + runtime.block_on(RustContextNamespace::create_with_options( + root_uri, spec, options, + )) + }); + Ok(Self { + inner: inner.map_err(to_py_err)?, + runtime, + }) + } + + fn root_uri(&self) -> &str { + self.inner.root_uri() + } + + fn manifest_uri(&self) -> String { + self.inner.manifest_uri() + } + + fn partition_uri(&self, selector: &Bound<'_, PyDict>) -> PyResult { + let selector = selector_from_dict(selector)?; + Ok(self + .inner + .resolve_partition(&selector) + .map_err(to_py_err)? + .dataset_uri) + } + + fn context(&self, py: Python<'_>, selector: &Bound<'_, PyDict>) -> PyResult { + let selector = selector_from_dict(selector)?; + let partition = self.inner.resolve_partition(&selector).map_err(to_py_err)?; + let store = py.allow_threads(|| self.runtime.block_on(self.inner.context(&selector))); + Ok(Context { + inner: RustContext::new(partition.dataset_uri), + store: store.map_err(to_py_err)?, + runtime: Arc::clone(&self.runtime), + run_id: new_run_id(), + }) + } + + fn partitions(&self, py: Python<'_>) -> PyResult> { + let partitions = py.allow_threads(|| self.runtime.block_on(self.inner.partitions())); + partitions + .map_err(to_py_err)? + .into_iter() + .map(|partition| partition_info_to_py(py, partition)) + .collect() + } +} + impl Context { fn prepare_record_from_dict( &self, @@ -747,14 +873,14 @@ impl Context { let embedding = optional_item(dict, "embedding")?.map(|value| value.extract::>()); let bot_id = optional_item(dict, "bot_id")?.map(|value| value.extract::()); let session_id = optional_item(dict, "session_id")?.map(|value| value.extract::()); + let tenant = optional_item(dict, "tenant")?.map(|value| value.extract::()); + let source = optional_item(dict, "source")?.map(|value| value.extract::()); let external_id = optional_item(dict, "external_id")?.map(|value| value.extract::()); let state_metadata = match optional_item(dict, "state_metadata")? { Some(value) => { let metadata = value.downcast::().map_err(|_| { - PyTypeError::new_err(format!( - "records[{index}].state_metadata must be a dict" - )) + PyTypeError::new_err(format!("records[{index}].state_metadata must be a dict")) })?; state_metadata_from_dict(Some(metadata))? } @@ -795,6 +921,8 @@ impl Context { embedding: embedding.transpose()?, bot_id: bot_id.transpose()?, session_id: session_id.transpose()?, + tenant: tenant.transpose()?, + source: source.transpose()?, external_id: external_id.transpose()?, state_metadata, metadata_json: metadata_json.transpose()?, @@ -818,6 +946,8 @@ impl Context { embedding, bot_id, session_id, + tenant, + source, external_id, state_metadata, metadata_json, @@ -857,6 +987,8 @@ impl Context { run_id: self.run_id.clone(), bot_id, session_id, + tenant, + source, created_at: Utc::now(), role: role.clone(), state_metadata, @@ -1021,6 +1153,8 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { run_id, bot_id, session_id, + tenant, + source, created_at, role, state_metadata, @@ -1045,6 +1179,8 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { dict.set_item("run_id", run_id)?; dict.set_item("bot_id", bot_id)?; dict.set_item("session_id", session_id)?; + dict.set_item("tenant", tenant)?; + dict.set_item("source", source)?; dict.set_item( "created_at", created_at.to_rfc3339_opts(SecondsFormat::Micros, true), @@ -1104,6 +1240,19 @@ fn relationships_to_py(py: Python<'_>, relationships: Vec) -> PyRe Ok(list.into_pyobject(py)?.unbind().into()) } +fn partition_info_to_py(py: Python<'_>, partition: PartitionInfo) -> PyResult { + let dict = PyDict::new(py); + let selector = PyDict::new(py); + for (key, value) in partition.selector { + selector.set_item(key, value)?; + } + dict.set_item("partition_id", partition.partition_id)?; + dict.set_item("spec_version", partition.spec_version)?; + dict.set_item("selector", selector)?; + dict.set_item("dataset_uri", partition.dataset_uri)?; + Ok(dict.into_pyobject(py)?.unbind().into()) +} + fn json_value_to_py(py: Python<'_>, value: &Value) -> PyResult { let json = PyModule::import(py, "json")?; Ok(json.call_method1("loads", (value.to_string(),))?.unbind()) @@ -1117,5 +1266,6 @@ fn to_py_err(err: E) -> PyErr { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(version, m)?)?; m.add_class::()?; + m.add_class::()?; Ok(()) }