diff --git a/baml_language/crates/baml_builtins2/baml_std/baml/ns_panics/panics.baml b/baml_language/crates/baml_builtins2/baml_std/baml/ns_panics/panics.baml index 1872841641..3a2bc4d245 100644 --- a/baml_language/crates/baml_builtins2/baml_std/baml/ns_panics/panics.baml +++ b/baml_language/crates/baml_builtins2/baml_std/baml/ns_panics/panics.baml @@ -23,6 +23,11 @@ class Unreachable { message string } +/// A cancellation was requested. +class Cancelled { + message string +} + /// A user-caused panic from `baml.sys.panic`. class UserPanic { message string @@ -35,4 +40,4 @@ class AllocFailure { message string } -type Panic = DivisionByZero | IndexOutOfBounds | MapKeyNotFound | StackOverflow | AssertionFailed | Unreachable | UserPanic | AllocFailure +type Panic = DivisionByZero | IndexOutOfBounds | MapKeyNotFound | StackOverflow | AssertionFailed | Unreachable | Cancelled | UserPanic | AllocFailure diff --git a/baml_language/crates/baml_builtins2_codegen/src/codegen_io.rs b/baml_language/crates/baml_builtins2_codegen/src/codegen_io.rs index 995802404e..de91eefaa5 100644 --- a/baml_language/crates/baml_builtins2_codegen/src/codegen_io.rs +++ b/baml_language/crates/baml_builtins2_codegen/src/codegen_io.rs @@ -1880,6 +1880,9 @@ pub fn generate_io_adapter( let resolve_fn = emit_resolve_helper(); let tokens = quote! { + // Bring `HeapPermit::proof()` into scope for the adapter impl below. + use ::bex_heap::HeapPermit as _; + #resolve_fn #adapter_struct #adapter_impl diff --git a/baml_language/crates/baml_lsp_server/src/playground_server.rs b/baml_language/crates/baml_lsp_server/src/playground_server.rs index 3c98508333..7956bd0c15 100644 --- a/baml_language/crates/baml_lsp_server/src/playground_server.rs +++ b/baml_language/crates/baml_lsp_server/src/playground_server.rs @@ -23,6 +23,7 @@ use axum::{ routing::get, }; use base64::Engine as _; +use bex_project::{is_cancelled_engine_error, is_cancelled_runtime_error}; use futures::{SinkExt, stream::StreamExt}; use prost::Message; use tokio::{net::TcpListener, sync::broadcast}; @@ -294,10 +295,7 @@ async fn handle_ws_in_message( } } Err(e) => { - let is_cancelled = matches!( - &e, - bex_project::RuntimeError::Engine(bex_project::EngineError::Cancelled) - ); + let is_cancelled = is_cancelled_runtime_error(&e); WsOutMessage::CallFunctionError { id, error: format!("{e}"), @@ -355,7 +353,7 @@ async fn handle_ws_in_message( } } Err(e) => { - let is_cancelled = matches!(&e, bex_project::EngineError::Cancelled); + let is_cancelled = is_cancelled_engine_error(&e); WsOutMessage::CallFunctionError { id, error: format!("{e}"), diff --git a/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snap b/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snap index be8e565851..c6a9485730 100644 --- a/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snap +++ b/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snap @@ -376,6 +376,9 @@ class baml.panics.AllocFailure { class baml.panics.AssertionFailed { message: string } +class baml.panics.Cancelled { + message: string +} class baml.panics.DivisionByZero { dividend: int } @@ -395,7 +398,7 @@ class baml.panics.Unreachable { class baml.panics.UserPanic { message: string } -type baml.panics.Panic = baml.panics.DivisionByZero | baml.panics.IndexOutOfBounds | baml.panics.MapKeyNotFound | baml.panics.StackOverflow | baml.panics.AssertionFailed | baml.panics.Unreachable | baml.panics.UserPanic | baml.panics.AllocFailure +type baml.panics.Panic = baml.panics.DivisionByZero | baml.panics.IndexOutOfBounds | baml.panics.MapKeyNotFound | baml.panics.StackOverflow | baml.panics.AssertionFailed | baml.panics.Unreachable | baml.panics.Cancelled | baml.panics.UserPanic | baml.panics.AllocFailure --- /baml/ns_stream/stream.baml --- class baml.stream.StreamFinished { diff --git a/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_tir.snap b/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_tir.snap index f2d577a582..bacc44f1ce 100644 --- a/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_tir.snap +++ b/baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_tir.snap @@ -851,13 +851,16 @@ class baml.panics.AssertionFailed { class baml.panics.Unreachable { message: string } +class baml.panics.Cancelled { + message: string +} class baml.panics.UserPanic { message: string } class baml.panics.AllocFailure { message: string } -type baml.panics.Panic = baml.panics.DivisionByZero | baml.panics.IndexOutOfBounds | baml.panics.MapKeyNotFound | baml.panics.StackOverflow | baml.panics.AssertionFailed | baml.panics.Unreachable | baml.panics.UserPanic | baml.panics.AllocFailure +type baml.panics.Panic = baml.panics.DivisionByZero | baml.panics.IndexOutOfBounds | baml.panics.MapKeyNotFound | baml.panics.StackOverflow | baml.panics.AssertionFailed | baml.panics.Unreachable | baml.panics.Cancelled | baml.panics.UserPanic | baml.panics.AllocFailure class baml.panics.DivisionByZero$stream { dividend: null | int } @@ -877,13 +880,16 @@ class baml.panics.AssertionFailed$stream { class baml.panics.Unreachable$stream { message: null | string } +class baml.panics.Cancelled$stream { + message: null | string +} class baml.panics.UserPanic$stream { message: null | string } class baml.panics.AllocFailure$stream { message: null | string } -type baml.panics.Panic$stream = baml.panics.DivisionByZero$stream | baml.panics.IndexOutOfBounds$stream | baml.panics.MapKeyNotFound$stream | baml.panics.StackOverflow$stream | baml.panics.AssertionFailed$stream | baml.panics.Unreachable$stream | baml.panics.UserPanic$stream | baml.panics.AllocFailure$stream +type baml.panics.Panic$stream = baml.panics.DivisionByZero$stream | baml.panics.IndexOutOfBounds$stream | baml.panics.MapKeyNotFound$stream | baml.panics.StackOverflow$stream | baml.panics.AssertionFailed$stream | baml.panics.Unreachable$stream | baml.panics.Cancelled$stream | baml.panics.UserPanic$stream | baml.panics.AllocFailure$stream --- /baml/ns_stream/stream.baml --- class baml.stream.StreamFinished { diff --git a/baml_language/crates/baml_tests/src/compiler2_tir/snapshots/baml_tests__compiler2_tir__phase5__snapshot_baml_package_items.snap b/baml_language/crates/baml_tests/src/compiler2_tir/snapshots/baml_tests__compiler2_tir__phase5__snapshot_baml_package_items.snap index e7aaccd248..e10189e3c7 100644 --- a/baml_language/crates/baml_tests/src/compiler2_tir/snapshots/baml_tests__compiler2_tir__phase5__snapshot_baml_package_items.snap +++ b/baml_language/crates/baml_tests/src/compiler2_tir/snapshots/baml_tests__compiler2_tir__phase5__snapshot_baml_package_items.snap @@ -88,6 +88,7 @@ namespace baml.net: namespace baml.panics: class AllocFailure { methods: [] } class AssertionFailed { methods: [] } + class Cancelled { methods: [] } class DivisionByZero { methods: [] } class IndexOutOfBounds { methods: [] } class MapKeyNotFound { methods: [] } diff --git a/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded.snap b/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded.snap index d8d6bf398c..e82e983040 100644 --- a/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded.snap +++ b/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded.snap @@ -113,7 +113,7 @@ function testing.TestCollector.find_testset(self: null, name: string) -> testing } function testing.TestCollector.new(prefix: string) -> testing.TestCollector { - 23 0 alloc_instance 117 (TestCollector) + 23 0 alloc_instance 119 (TestCollector) 1 load_var 1 (prefix) 2 init_field 0 (prefix) 3 alloc_array 0 @@ -185,7 +185,7 @@ function testing.TestCollector.register_test(self: null, name: string, body: () 34 54 load_var 1 (self) 55 load_field 1 (tests) - 56 alloc_instance 109 (TestRegistration) + 56 alloc_instance 111 (TestRegistration) 57 load_var 11 (final_name) 58 init_field 0 (name) 59 load_var 3 (body) @@ -289,7 +289,7 @@ function testing.TestCollector.register_test_set(self: null, name: string, colle 45 54 load_var 1 (self) 55 load_field 2 (testsets) - 56 alloc_instance 111 (TestSetRegistration) + 56 alloc_instance 113 (TestSetRegistration) 57 load_var 11 (final_name) 58 init_field 0 (name) 59 load_var 3 (collector) @@ -416,7 +416,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S 147 68 call 67 (baml.sys.now_ms) 69 store_var 6 (start) - 148 70 alloc_instance 117 (TestCollector) + 148 70 alloc_instance 119 (TestCollector) 71 load_var 2 (name) 72 init_field 0 (prefix) 73 alloc_array 0 @@ -438,7 +438,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S 86 load_field 1 (expansions) 87 load_var 2 (name) - 151 88 alloc_instance 116 (TestRegistry) + 151 88 alloc_instance 118 (TestRegistry) 152 89 load_var 7 (sub_collector) 90 init_field 0 (collector) @@ -459,7 +459,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S } function testing.TestRegistry.new(collector: testing.TestCollector) -> testing.TestRegistry { - 87 0 alloc_instance 116 (TestRegistry) + 87 0 alloc_instance 118 (TestRegistry) 1 load_var 1 (collector) 2 init_field 0 (collector) @@ -616,7 +616,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 45 pop_jump_if_false +30 (to 75) 115 46 load_var 2 (items) - 47 alloc_instance 110 (SerializedTest) + 47 alloc_instance 112 (SerializedTest) 48 load_const 3 ("lazyTestSet") 49 init_field 0 (type) 50 load_var 7 (ts) @@ -638,7 +638,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 63 store_var 11 (_26) 108 64 load_var 2 (items) - 65 alloc_instance 108 (SerializedTestSet) + 65 alloc_instance 110 (SerializedTestSet) 66 load_var 10 (_25) 67 init_field 0 (name) 68 load_var 11 (_26) @@ -657,7 +657,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 79 jump -59 (to 20) 102 80 load_var 2 (items) - 81 alloc_instance 110 (SerializedTest) + 81 alloc_instance 112 (SerializedTest) 82 load_const 5 ("test") 83 init_field 0 (type) @@ -681,7 +681,7 @@ function testing.run_test(body: () -> void throws unknown, runner: ((() -> testi 2 store_var 1 173 3 load_var 1 (body) - 4 make_closure 333 1 + 4 make_closure 335 1 5 store_var 3 (base_run) 188 6 load_var 2 (runner) diff --git a/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded_unoptimized.snap b/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded_unoptimized.snap index a9036e778d..6c1d1d7a82 100644 --- a/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded_unoptimized.snap +++ b/baml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded_unoptimized.snap @@ -115,7 +115,7 @@ function testing.TestCollector.find_testset(self: null, name: string) -> testing } function testing.TestCollector.new(prefix: string) -> testing.TestCollector { - 23 0 alloc_instance 117 (TestCollector) + 23 0 alloc_instance 119 (TestCollector) 1 load_var 1 (prefix) 2 init_field 0 (prefix) 3 alloc_array 0 @@ -189,7 +189,7 @@ function testing.TestCollector.register_test(self: null, name: string, body: () 34 54 load_var 1 (self) 55 load_field 1 (tests) - 56 alloc_instance 109 (TestRegistration) + 56 alloc_instance 111 (TestRegistration) 57 load_var 12 (final_name) 58 init_field 0 (name) 59 load_var 3 (body) @@ -295,7 +295,7 @@ function testing.TestCollector.register_test_set(self: null, name: string, colle 45 54 load_var 1 (self) 55 load_field 2 (testsets) - 56 alloc_instance 111 (TestSetRegistration) + 56 alloc_instance 113 (TestSetRegistration) 57 load_var 12 (final_name) 58 init_field 0 (name) 59 load_var 3 (collector) @@ -424,7 +424,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S 147 68 call 67 (baml.sys.now_ms) 69 store_var 6 (start) - 148 70 alloc_instance 117 (TestCollector) + 148 70 alloc_instance 119 (TestCollector) 71 load_var 2 (name) 72 init_field 0 (prefix) 73 alloc_array 0 @@ -444,7 +444,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S 85 bin_op - 86 store_var 8 (elapsed) - 151 87 alloc_instance 116 (TestRegistry) + 151 87 alloc_instance 118 (TestRegistry) 152 88 load_var 7 (sub_collector) 89 init_field 0 (collector) @@ -469,7 +469,7 @@ function testing.TestRegistry.expand_set(self: null, name: string) -> (testing.S } function testing.TestRegistry.new(collector: testing.TestCollector) -> testing.TestRegistry { - 87 0 alloc_instance 116 (TestRegistry) + 87 0 alloc_instance 118 (TestRegistry) 1 load_var 1 (collector) 2 init_field 0 (collector) @@ -630,7 +630,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 47 pop_jump_if_false +30 (to 77) 115 48 load_var 3 (items) - 49 alloc_instance 110 (SerializedTest) + 49 alloc_instance 112 (SerializedTest) 50 load_const 3 ("lazyTestSet") 51 init_field 0 (type) 52 load_var 9 (ts) @@ -652,7 +652,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 65 store_var 13 (_26) 108 66 load_var 3 (items) - 67 alloc_instance 108 (SerializedTestSet) + 67 alloc_instance 110 (SerializedTestSet) 68 load_var 12 (_25) 69 init_field 0 (name) 70 load_var 13 (_26) @@ -676,7 +676,7 @@ function testing.TestRegistry.serialize(self: null) -> (testing.SerializedTest | 85 store_var 6 (t) 102 86 load_var 3 (items) - 87 alloc_instance 110 (SerializedTest) + 87 alloc_instance 112 (SerializedTest) 88 load_const 5 ("test") 89 init_field 0 (type) 90 load_var 6 (t) @@ -698,7 +698,7 @@ function testing.run_test(body: () -> void throws unknown, runner: ((() -> testi 2 store_var 1 173 3 load_var 1 (body) - 4 make_closure 333 1 + 4 make_closure 335 1 5 store_var 3 (base_run) 188 6 load_var 2 (runner) diff --git a/baml_language/crates/baml_tests/tests/exceptions.rs b/baml_language/crates/baml_tests/tests/exceptions.rs index 9d9a38aa25..95ed5c903b 100644 --- a/baml_language/crates/baml_tests/tests/exceptions.rs +++ b/baml_language/crates/baml_tests/tests/exceptions.rs @@ -3013,7 +3013,7 @@ async fn panic_alias_catches_any_panic() { } "# ); - insta::assert_snapshot!(output.bytecode, @r" + insta::assert_snapshot!(output.bytecode, @" function divides() -> int { load_const 1 load_const 0 @@ -3026,7 +3026,7 @@ async fn panic_alias_catches_any_panic() { jump L2 load_var e type_tag - jump_table [L1, L1, L1, _, _, L1, _, _, _, L1, L1, _, L1, L1], default L0 + jump_table [L1, L1, L1, L1, _, _, L1, _, _, _, L1, L1, _, L1, L1], default L0 L0: load_var e @@ -3067,7 +3067,7 @@ async fn panic_alias_plus_wildcard_dispatch() { jump L2 load_var e type_tag - jump_table [L1, L1, L1, _, _, L1, _, _, _, L1, L1, _, L1, L1], default L0 + jump_table [L1, L1, L1, L1, _, _, L1, _, _, _, L1, L1, _, L1, L1], default L0 L0: load_var e diff --git a/baml_language/crates/bex_engine/src/conversion.rs b/baml_language/crates/bex_engine/src/conversion.rs index 74cd30c316..2240df6d64 100644 --- a/baml_language/crates/bex_engine/src/conversion.rs +++ b/baml_language/crates/bex_engine/src/conversion.rs @@ -4,12 +4,11 @@ //! between the VM representation (`Value`, `Object`) and the external //! representation (`BexValue`, `BexExternalValue`). -use ::bex_vm_types::ObjectType; +use ::bex_heap::{BexValue, HeapPermit, PermitProof, TlabHolder}; +use ::bex_vm_types::{HeapPtr, Object, ObjectType, RootHaver, Value}; use baml_type::Literal; use bex_external_types::{BexExternalAdt, BexExternalValue, Ty, UnionMetadata}; -use bex_heap::{ActiveHeapPermit, BexValue, PermitProof}; use bex_vm::BexVm; -use bex_vm_types::{HeapPtr, Object, Value}; use crate::{BexEngine, EngineError}; @@ -194,6 +193,9 @@ impl BexEngine { Object::Future(_) => Err(EngineError::CannotConvert { type_name: "future".to_string(), }), + Object::UnscheduledFuture(_) => Err(EngineError::CannotConvert { + type_name: "unscheduled_future".to_string(), + }), Object::Collector(c) => Ok(BexExternalValue::Adt(BexExternalAdt::Collector(c.clone()))), Object::Type(ty) => Ok(BexExternalValue::Adt(BexExternalAdt::Type((**ty).clone()))), Object::Uint8Array(bytes) => Ok(BexExternalValue::Uint8Array(bytes.clone())), @@ -227,37 +229,43 @@ impl BexEngine { impl BexEngine { /// Convert a `BexExternalValue` result from sys ops back to a VM Value. - pub(crate) fn convert_external_to_vm_value( + pub(crate) fn convert_external_to_vm_value( &self, - vm: &mut ActiveHeapPermit, + holder: &mut impl HeapPermit, external: BexExternalValue, ) -> Value { match external { BexExternalValue::Handle(handle) => Value::Object( - self.resolve_handle(vm.proof(), &handle) + self.resolve_handle(holder.proof(), &handle) .expect("Handle should be valid - object was returned to external code"), ), BexExternalValue::Null => Value::Null, BexExternalValue::Int(i) => Value::Int(i), BexExternalValue::Float(f) => Value::Float(f), BexExternalValue::Bool(b) => Value::Bool(b), - BexExternalValue::String(s) => vm.alloc_string(s), + BexExternalValue::String(s) => { + Value::Object(holder.holder_mut().tlab_mut().alloc_string(s)) + } BexExternalValue::Array { items, .. } => { let values: Vec = items .into_iter() - .map(|v| self.convert_external_to_vm_value(vm, v)) + .map(|v| self.convert_external_to_vm_value(holder, v)) .collect(); - vm.alloc_array(values) + Value::Object(holder.holder_mut().tlab_mut().alloc_array(values)) } BexExternalValue::Map { entries, .. } => { let values: indexmap::IndexMap = entries .into_iter() - .map(|(k, v)| (k, self.convert_external_to_vm_value(vm, v))) + .map(|(k, v)| (k, self.convert_external_to_vm_value(holder, v))) .collect(); - vm.alloc_map(values) + Value::Object(holder.holder_mut().tlab_mut().alloc_map(values)) + } + BexExternalValue::Uint8Array(bytes) => { + Value::Object(holder.holder_mut().tlab_mut().alloc_uint8array(bytes)) + } + BexExternalValue::RustData(data) => { + Value::Object(holder.holder_mut().tlab_mut().alloc_rust_data(data)) } - BexExternalValue::Uint8Array(bytes) => vm.alloc_uint8array(bytes), - BexExternalValue::RustData(data) => vm.alloc_rust_data(data), // Allocate instance by looking up class and converting fields BexExternalValue::Instance { class_name, fields } => { let class_ptr = self @@ -280,9 +288,14 @@ impl BexEngine { let ext = fields.get(&class_field.name).unwrap_or_else(|| { panic!("missing field '{}' in Instance", class_field.name) }); - values.push(self.convert_external_to_vm_value(vm, ext.clone())); + values.push(self.convert_external_to_vm_value(holder, ext.clone())); } - vm.alloc_instance(*class_ptr, values) + Value::Object( + holder + .holder_mut() + .tlab_mut() + .alloc_instance(*class_ptr, values), + ) } BexExternalValue::Variant { enum_name, @@ -306,11 +319,22 @@ impl BexEngine { .unwrap_or_else(|| { panic!("Variant '{variant_name}' not found in enum '{enum_name}'") }); - vm.alloc_variant(*enum_ptr, index) + Value::Object( + holder + .holder_mut() + .tlab_mut() + .alloc_variant(*enum_ptr, index), + ) + } + BexExternalValue::Union { value, .. } => { + self.convert_external_to_vm_value(holder, *value) + } + BexExternalValue::Adt(BexExternalAdt::Collector(c)) => { + Value::Object(holder.holder_mut().tlab_mut().alloc_collector(c)) + } + BexExternalValue::Adt(BexExternalAdt::Type(ty)) => { + Value::Object(holder.holder_mut().tlab_mut().alloc_type(ty)) } - BexExternalValue::Union { value, .. } => self.convert_external_to_vm_value(vm, *value), - BexExternalValue::Adt(BexExternalAdt::Collector(c)) => vm.alloc_collector(c), - BexExternalValue::Adt(BexExternalAdt::Type(ty)) => vm.alloc_type(ty), BexExternalValue::Adt(BexExternalAdt::PromptAst(_)) => { panic!("PromptAst values cannot be converted to VM values yet") } @@ -318,14 +342,14 @@ impl BexEngine { panic!("Media values cannot be converted to VM values yet") } BexExternalValue::FunctionRef { global_index } => { - let idx = bex_vm_types::GlobalIndex::from_raw(global_index); + let _idx = bex_vm_types::GlobalIndex::from_raw(global_index); assert!( - (global_index < vm.globals.len()), + (global_index < self.globals.len()), "FunctionRef global_index {} out of bounds (globals len {})", global_index, - vm.globals.len() + self.globals.len() ); - vm.globals[idx] + self.globals[global_index] } } } @@ -595,6 +619,7 @@ fn find_matching_union_member<'a>(value: &Value, members: &'a [Ty]) -> Option<&' | Object::Class(_) | Object::Enum(_) | Object::Future(_) + | Object::UnscheduledFuture(_) | Object::RustData(_) | Object::Collector(_) | Object::Type(_) => None, @@ -692,6 +717,7 @@ pub(crate) fn vm_arg_to_external(vm: &BexVm, value: &Value) -> BexExternalValue | Object::Class(_) | Object::Enum(_) | Object::Future(_) + | Object::UnscheduledFuture(_) | Object::RustData(_) | Object::Collector(_) | Object::Type(_) => { diff --git a/baml_language/crates/bex_engine/src/future.rs b/baml_language/crates/bex_engine/src/future.rs new file mode 100644 index 0000000000..9a714a6107 --- /dev/null +++ b/baml_language/crates/bex_engine/src/future.rs @@ -0,0 +1,526 @@ +//! Future tracking for the Bex engine. +//! +//! # Lifecycle +//! +//! An entry exists in [`FutureManagerInner::active_futures`] **if** the +//! corresponding [`bex_vm_types::Future`] heap object is in the `Pending` +//! state, **or** in the `InternalError` state (which is leaked by design, +//! see below). +//! +//! - `fulfill_future`, `err_future`, `cancel_future`: terminal transitions +//! that update the heap object, signal the cross-task ready notification, +//! and remove the entry, all in one critical section. +//! - `internal_error_future`: terminal transition for unrecoverable engine +//! errors that does **not** remove the entry. The entry's `SetOnce` keeps +//! the original [`EngineError`] so a later VM `Await` can yield back to +//! the engine, which surfaces the error to the host. This intentionally +//! leaks the entry — internal errors should never happen in correct +//! programs, and the leak buys us never losing the underlying error +//! context to a removal/race window. +//! +//! Why this is safe: +//! +//! - The heap object alone is sufficient to drive the VM's `Await` +//! instruction after it resumes; the engine's "ready" future does not +//! carry the value, it is purely a "you may proceed" signal. +//! - All operations on a [`FutureManagerGuard`] hold an exclusive +//! [`SharedHeapPermitGuard`], so terminal-transition-then-remove is +//! atomic with respect to any other [`FutureManagerGuard`] operation +//! (notably [`FutureManagerGuard::future_ready`]). +//! - Existing `Arc>` clones held by waiters keep +//! working after the entry is dropped — removal only releases the +//! manager's own `Arc`. +//! +//! Why this works for fire-and-forget: while pending, the [`FutureState`] +//! roots the heap object via [`RootHaver::collect_roots`]. After the +//! sys-op task completes the entry is removed (or, for `InternalError`, +//! retained); if no VM stack still references the heap object and the +//! entry has been removed, it is correctly reclaimed by the next GC. +//! +//! Why this works for the await race on `Pending` → `Ready/Error/Cancelled` +//! (a VM observes `Pending(future_id)` on the heap, yields +//! `Await(future_id)`, and the engine completes the future before the event +//! loop calls [`FutureManagerGuard::future_ready`]): +//! [`FutureManagerGuard::future_ready`] treats a missing-but-previously- +//! issued `FutureId` as "already resolved" and returns an immediate +//! `Ok(())`. The VM re-executes the saved `Await` instruction, reads the +//! terminal state directly from the heap, and proceeds. +//! +//! For `InternalError`, no race is possible: the entry is never removed, so +//! `future_ready` always finds a waiter that yields the original error. + +use ::bex_heap::{ + HeapPermit, PermitProof, SharedHeapPermit, SharedHeapPermitGuard, Tlab, TlabHolder, +}; +use ::bex_vm_types::{ + HeapPtr, Object, ObjectType, RootHaver, Value, + types::{FutureId, FutureType}, +}; +use ::core::sync::atomic::AtomicUsize; +use ::std::{collections::HashMap, sync::Arc}; +use ::sys_types::CancellationToken; + +use crate::EngineError; + +/// Manages all futures for the Bex engine. +/// +/// This is a shared resource managed using a [`SharedHeapPermit`]. +pub struct FutureManager { + inner: SharedHeapPermit, +} + +impl FutureManager { + pub fn new(inner: SharedHeapPermit) -> Self { + Self { inner } + } + pub async fn acquire(&self) -> FutureManagerGuard<'_> { + FutureManagerGuard { + inner: self.inner.acquire().await, + } + } + + /// Number of `Pending` futures currently tracked. Acquires the manager. + pub async fn active_future_count(&self) -> usize { + self.inner.acquire().await.active_future_count() + } +} + +pub struct FutureManagerGuard<'a> { + inner: SharedHeapPermitGuard<'a, FutureManagerInner>, +} + +impl FutureManagerGuard<'_> { + /// Number of `Pending` futures currently tracked by the manager. + pub fn active_future_count(&self) -> usize { + self.inner.active_future_count() + } + + /// Registers a future with the future manager and returns a unique ID. + pub fn new_future(&mut self, cancel: CancellationToken) -> (FutureId, HeapPtr) { + // The contract on `FutureId::from_usize` is "no two live ids share a + // usize". We satisfy this by drawing the value from the manager's + // monotonic `AtomicUsize`; uniqueness is preserved as long as the + // counter hasn't wrapped (which would take 2^64 calls). + let id = self + .inner + .next_future_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let id = FutureId::from_usize(id); + + let ptr = self + .inner + .tlab + .alloc_future(::bex_vm_types::Future::Pending(id)); + + let future_state = FutureState { + future: ptr, + ready: Arc::new(tokio::sync::SetOnce::new()), + cancel, + }; + self.inner.active_futures.insert(id, future_state); + (id, ptr) + } + pub fn fulfill_future(&mut self, id: FutureId, value: Value) -> Result<(), EngineError> { + self.complete_pending(id, bex_vm_types::Future::Ready(value), Ok(()))?; + Ok(()) + } + pub fn err_future(&mut self, id: FutureId, err: Value) -> Result<(), EngineError> { + self.complete_pending(id, bex_vm_types::Future::Error(err), Ok(()))?; + Ok(()) + } + pub fn cancel_future(&mut self, id: FutureId) -> Result<(), EngineError> { + let entry = self.complete_pending(id, bex_vm_types::Future::Cancelled, Ok(()))?; + // The token is still cloned by the spawned sys-op task; firing it + // here unparks that task even though `entry` itself is about to be + // dropped. + entry.cancel.cancel(); + Ok(()) + } + /// Sets the future to `InternalError` and notifies the waiter. + /// + /// Unlike the other terminal-transition helpers, this does **not** remove + /// the entry from `active_futures`. The originating `EngineError` is + /// preserved on the entry's `SetOnce` so a later VM `Await` (which yields + /// `Await(future_id)` even for an InternalError-state heap object) can + /// surface the original error from this method's caller. Internal errors + /// are bugs that "should never happen", so the resulting permanent leak is + /// acceptable in exchange for not losing the underlying error context. + pub fn internal_error_future( + &mut self, + id: FutureId, + err: EngineError, + ) -> Result<(), EngineError> { + let entry = self + .inner + .active_futures + .get_mut(&id) + .ok_or(EngineError::FutureNotFound { future_id: id })?; + // SAFETY: the `FutureManagerGuard` holds an exclusive heap permit. + let fut = unsafe { entry.get_mut() }?; + debug_assert!( + matches!(fut, bex_vm_types::Future::Pending(_)), + "internal_error_future called on non-Pending future {id:?}; \ + invariant violated" + ); + *fut = bex_vm_types::Future::InternalError(id); + let set = entry.ready.set(Err(err)); + debug_assert!( + set.is_ok(), + "Should not have been ready if the heap future was pending." + ); + Ok(()) + } + + /// Atomically transition a `Pending` future to a terminal state, signal + /// its [`tokio::sync::SetOnce`] waiter, and remove the entry from + /// `active_futures`. The dropped [`FutureState`] is returned so callers + /// (e.g. [`Self::cancel_future`]) can perform additional Drop-time work + /// like firing a [`CancellationToken`] clone before it is released. + /// + /// # Invariant + /// Only `fulfill_future`, `err_future`, and `cancel_future` route through + /// this helper. `internal_error_future` deliberately does **not** — its + /// entries are leaked to preserve the original error on the `SetOnce` and + /// to let the VM yield back to the engine for surfacing. The + /// `debug_assert` below encodes that invariant: if `complete_pending` ever + /// observes a non-`Pending` heap state, a caller has violated the + /// removal/transition contract. + fn complete_pending( + &mut self, + id: FutureId, + new_state: bex_vm_types::Future, + result: Result<(), EngineError>, + ) -> Result { + let mut entry = self + .inner + .active_futures + .remove(&id) + .ok_or(EngineError::FutureNotFound { future_id: id })?; + // SAFETY: the `FutureManagerGuard` holds an exclusive heap permit. + let fut = unsafe { entry.get_mut() }?; + debug_assert!( + matches!(fut, bex_vm_types::Future::Pending(_)), + "complete_pending called with non-Pending heap state for {id:?} \ + (actual: {:?}); invariant violated — only fulfill/err/cancel may \ + route through this helper", + FutureType::of(fut) + ); + *fut = new_state; + let set = entry.ready.set(result); + debug_assert!( + set.is_ok(), + "Should not have been ready if the heap future was pending." + ); + Ok(entry) + } + /// Returns a Rust future that resolves when the BAML future is ready. + /// Once it is resolved, the future on the heap will be in a terminal + /// state (some variant other than `Pending`). + /// + /// A `FutureId` that is missing from `active_futures` but has been + /// previously issued (i.e. `id.as_usize() < next_future_id`) is treated + /// as already resolved: the entry was dropped by the terminal-transition + /// helper after the heap object was set. The VM's `Await` re-execution + /// reads the terminal state directly from the heap. See the module-level + /// docs for the full lifecycle invariant. + /// + /// ## Errors + /// - Synchronous `EngineError::FutureNotFound` for a `FutureId` that was + /// never issued by this manager. + /// - The returned future yields `EngineError` if the future produced an + /// `InternalError`. + pub fn future_ready( + &self, + id: FutureId, + ) -> Result> + use<>, EngineError> { + let waiter = match self.inner.active_futures.get(&id) { + Some(future) => Some(Arc::clone(&future.ready)), + None => { + let next = self + .inner + .next_future_id + .load(std::sync::atomic::Ordering::Relaxed); + if id.as_usize() >= next { + return Err(EngineError::FutureNotFound { future_id: id }); + } + None + } + }; + Ok(async move { + match waiter { + Some(w) => w.wait().await.clone(), + None => Ok(()), + } + }) + } +} +impl TlabHolder for FutureManagerGuard<'_> { + fn tlab(&self) -> &Tlab { + self.inner.tlab() + } + fn tlab_mut(&mut self) -> &mut Tlab { + self.inner.tlab_mut() + } +} +impl HeapPermit for FutureManagerGuard<'_> { + fn holder(&self) -> &FutureManagerInner { + &self.inner + } + fn holder_mut(&mut self) -> &mut FutureManagerInner { + &mut self.inner + } + fn proof(&self) -> PermitProof<'_> { + self.inner.proof() + } +} + +pub struct FutureManagerInner { + tlab: Tlab, + next_future_id: AtomicUsize, + active_futures: HashMap, +} +impl FutureManagerInner { + pub fn new(tlab: Tlab) -> Self { + Self { + tlab, + next_future_id: AtomicUsize::new(0), + active_futures: HashMap::new(), + } + } + + /// Number of `Pending` futures currently tracked by the manager. This is + /// the same as the number of futures whose heap object is in + /// `Future::Pending(_)`. Intended for tests and telemetry. + pub fn active_future_count(&self) -> usize { + self.active_futures.len() + } +} +impl RootHaver for FutureManagerInner { + fn collect_roots(&self, roots: &mut Vec) { + // blocking is fine since we should only ever call this while holding exclusive heap access + for future in self.active_futures.values() { + future.collect_roots(roots); + } + } + fn forward_roots(&mut self, roots: &HashMap) { + for future in self.active_futures.values_mut() { + future.forward_roots(roots); + } + } +} +impl TlabHolder for FutureManagerInner { + fn tlab(&self) -> &Tlab { + &self.tlab + } + fn tlab_mut(&mut self) -> &mut Tlab { + &mut self.tlab + } +} + +struct FutureState { + future: HeapPtr, + /// Set once the `Future` object is no longer `Pending` + /// - `Ok(())` means there is a BAML value ready on the heap + /// - `Err(err)` means it's `InternalError` and `err` is the error value + ready: Arc>>, + pub cancel: CancellationToken, +} +impl FutureState { + /// SAFETY: We must hold a heap permit for the duration of the future object. + unsafe fn get_mut(&mut self) -> Result<&mut bex_vm_types::Future, EngineError> { + // SAFETY: We hold a permit, so we can access the future object. + let obj = unsafe { self.future.get_mut() }; + match obj { + Object::Future(fut) => Ok(fut), + other => Err(EngineError::TypeMismatch { + message: format!("Expected Future, got {:?}", ObjectType::of(other)), + }), + } + } +} +impl RootHaver for FutureState { + fn collect_roots(&self, roots: &mut Vec) { + roots.push(self.future); + } + fn forward_roots(&mut self, roots: &HashMap) { + if let Some(new_result) = roots.get(&self.future) { + self.future = *new_result; + } + } +} + +#[cfg(test)] +mod tests { + use ::bex_heap::{BexHeap, HeapPermitManager}; + use ::bex_vm_types::Value; + + use super::*; + + async fn make_manager() -> FutureManager { + let heap = BexHeap::new(Vec::new()); + let permit_manager = Arc::new(HeapPermitManager::new()); + let permit = permit_manager + .new_permit(FutureManagerInner::new(Tlab::new_empty(Arc::clone(&heap)))) + .await; + FutureManager::new(SharedHeapPermit::new(permit)) + } + + #[tokio::test] + async fn fulfill_removes_entry() { + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _ptr) = guard.new_future(CancellationToken::new()); + assert_eq!(guard.active_future_count(), 1); + guard.fulfill_future(id, Value::Int(42)).unwrap(); + assert_eq!(guard.active_future_count(), 0); + } + + #[tokio::test] + async fn cancel_removes_entry_and_fires_token() { + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let token = CancellationToken::new(); + let (id, _ptr) = guard.new_future(token.clone()); + assert!(!token.is_cancelled()); + guard.cancel_future(id).unwrap(); + assert_eq!(guard.active_future_count(), 0); + assert!(token.is_cancelled()); + } + + #[tokio::test] + async fn err_removes_entry_but_internal_error_leaks() { + // `err_future` is a normal terminal transition and removes its entry. + // `internal_error_future` deliberately leaks so the original error stays + // pinned to the registry's `SetOnce` for surfacing through a later + // VM `Await(future_id)` yield. + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id_a, _) = guard.new_future(CancellationToken::new()); + let (id_b, _) = guard.new_future(CancellationToken::new()); + assert_eq!(guard.active_future_count(), 2); + guard.err_future(id_a, Value::Int(7)).unwrap(); + guard + .internal_error_future( + id_b, + EngineError::TypeMismatch { + message: "boom".into(), + }, + ) + .unwrap(); + // Only `id_a` was removed; `id_b` is the leaked InternalError entry. + assert_eq!(guard.active_future_count(), 1); + } + + #[tokio::test] + async fn internal_error_future_preserves_original_error_via_future_ready() { + // The whole point of leaking the entry: `future_ready` must hand back + // the original `EngineError` so the engine can re-throw it to the + // host. This is the key correctness path that the H1+H2 unification + // restored — without the leak, a race window would collapse the error + // to `VmInternalError::AwaitedFutureInternalError`. + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _) = guard.new_future(CancellationToken::new()); + let original = EngineError::TypeMismatch { + message: "synthetic op error".into(), + }; + guard.internal_error_future(id, original.clone()).unwrap(); + // Entry should still be live (deliberate leak). + assert_eq!(guard.active_future_count(), 1); + + // A waiter on the leaked entry resolves to the original error. + let waiter = guard.future_ready(id).expect("waiter should be created"); + drop(guard); + let surfaced = waiter.await.expect_err("InternalError should propagate"); + assert_eq!(surfaced, original); + } + + #[tokio::test] + async fn fulfill_after_internal_error_is_disallowed() { + // Once an entry is in the leaked InternalError state, none of the + // terminal-transition helpers (fulfill/err/cancel) should subsequently + // succeed against it: the `complete_pending` invariant is "entry exists + // and heap is Pending". In debug builds the debug_assert fires; in + // release builds the heap state is non-Pending and the call still + // produces a `FutureNotFound`-or-equivalent error path. + // + // We exercise the release-build path by skipping under + // `cfg(debug_assertions)` — the panic there is the documented + // intent of the invariant. + if cfg!(debug_assertions) { + return; + } + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _) = guard.new_future(CancellationToken::new()); + guard + .internal_error_future( + id, + EngineError::TypeMismatch { + message: "stale".into(), + }, + ) + .unwrap(); + // The release-build path: `fulfill_future`'s `complete_pending` happily + // overwrites the heap to `Ready` and removes the entry. This is + // tolerated rather than guaranteed (the `complete_pending` debug_assert + // fails in debug builds). The point of the test is to pin the + // observable behavior of the leaked state. + let _ = guard.fulfill_future(id, Value::Int(0)); + } + + #[tokio::test] + async fn double_complete_returns_not_found() { + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _) = guard.new_future(CancellationToken::new()); + guard.fulfill_future(id, Value::Int(1)).unwrap(); + let again = guard.fulfill_future(id, Value::Int(2)); + assert!(matches!(again, Err(EngineError::FutureNotFound { .. }))); + } + + #[tokio::test] + async fn future_ready_immediate_for_completed_id() { + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _) = guard.new_future(CancellationToken::new()); + guard.fulfill_future(id, Value::Int(1)).unwrap(); + // Entry is gone; future_ready should treat it as already-resolved. + let waiter = guard.future_ready(id).expect("expected immediate Ok"); + drop(guard); + waiter.await.expect("should resolve to Ok(())"); + } + + #[tokio::test] + async fn future_ready_for_never_issued_id_errors() { + let mgr = make_manager().await; + let guard = mgr.acquire().await; + // No futures have been issued; any non-zero id is bogus. The contract + // on `from_usize` is "no two live ids collide" — for this test we + // construct one that is plainly out of range, which is safe since no + // other id exists for it to collide with. + let bogus = FutureId::from_usize(99); + let result = guard.future_ready(bogus); + assert!(matches!(result, Err(EngineError::FutureNotFound { .. }))); + } + + #[tokio::test] + async fn future_ready_waiter_resolves_after_fulfill() { + // Grab a waiter while the future is still pending; fulfill from a + // separate critical section; then the waiter should resolve. This + // exercises the path where `future_ready` clones the `Arc` + // before the entry is removed. + let mgr = make_manager().await; + let mut guard = mgr.acquire().await; + let (id, _) = guard.new_future(CancellationToken::new()); + let waiter = guard.future_ready(id).expect("waiter should be created"); + drop(guard); + + let mut guard = mgr.acquire().await; + guard.fulfill_future(id, Value::Int(123)).unwrap(); + drop(guard); + + waiter.await.expect("waiter should resolve to Ok(())"); + assert_eq!(mgr.active_future_count().await, 0); + } +} diff --git a/baml_language/crates/bex_engine/src/lib.rs b/baml_language/crates/bex_engine/src/lib.rs index 9ae7576a00..4d7d679c26 100644 --- a/baml_language/crates/bex_engine/src/lib.rs +++ b/baml_language/crates/bex_engine/src/lib.rs @@ -70,14 +70,17 @@ mod conversion; mod function_call_context; +mod future; use std::{ collections::HashMap, sync::{Arc, Mutex, atomic::Ordering}, }; +use ::bex_heap::{HeapPermit as _, Tlab}; // Re-export event types for callers. -use ::bex_vm_types::RootHaver; +use ::bex_vm_types::{RootHaver, types::FutureId}; use ::core::sync::atomic::AtomicBool; +use ::sys_types::OpFuture; use async_trait::async_trait; use bex_events::{EventKind, FunctionEnd, FunctionEvent, FunctionStart, SpanContext}; pub use bex_events::{HostSpanContext, RuntimeEvent, SpanId}; @@ -85,19 +88,25 @@ pub use bex_external_types::{BexExternalValue, Ty, TypeName, UnionMetadata}; use bex_heap::BexHeap; // Re-export GcStats for users of the engine pub use bex_heap::GcStats; -pub use bex_heap::{ActiveHeapPermit, HeapGuard, HeapPermitManager, InactiveHeapPermit}; +pub use bex_heap::{ + ActiveHeapPermit, HeapGuard, HeapPermitManager, InactiveHeapPermit, SharedHeapPermit, + SharedHeapPermitGuard, +}; use bex_vm::{BexVm, SpanNotification, VmExecState}; -use bex_vm_types::{FunctionMeta, FunctionOrigin, GlobalPool, HeapPtr, Object, SysOp, Value}; +use bex_vm_types::{ + FunctionMeta, FunctionOrigin, GlobalPool, HeapPtr, Object, SysOp, Value, VmGlobals, +}; pub use conversion::test_arg_to_external; // Re-export CancellationToken for callers. pub use function_call_context::{FunctionCallContext, FunctionCallContextBuilder}; pub use sys_types::CallId; use sys_types::{OpError, SysOpResult}; use thiserror::Error; -use tokio::sync::mpsc; pub use tokio_util::sync::CancellationToken; use web_time::{Instant, SystemTime}; +pub use crate::future::{FutureManager, FutureManagerGuard, FutureManagerInner}; + // ============================================================================ // Engine Types // ============================================================================ @@ -113,44 +122,6 @@ pub struct UserFunctionInfo { pub return_type: Ty, } -/// Result of an external future. -struct FutureResult { - id: HeapPtr, - result: Result, -} - -/// RAII guard for in-flight async sys-op task abort handles. -/// -/// On drop, aborts all tracked tasks so early returns (`?`) do not leave -/// spawned work running in the background. -struct AbortHandlesGuard { - handles: Vec, -} - -impl AbortHandlesGuard { - fn new() -> Self { - Self { - handles: Vec::new(), - } - } - - fn push(&mut self, handle: futures::future::AbortHandle) { - self.handles.push(handle); - } - - fn abort_all(&self) { - for handle in &self.handles { - handle.abort(); - } - } -} - -impl Drop for AbortHandlesGuard { - fn drop(&mut self) { - self.abort_all(); - } -} - // ============================================================================ // Span Tracking (per-invocation, NOT on Arc) // ============================================================================ @@ -179,11 +150,14 @@ struct SpanState { } /// Errors that can occur during engine execution. -#[derive(Debug, PartialEq, Error)] +#[derive(Debug, PartialEq, Error, Clone)] pub enum EngineError { #[error("Function call with ID {call_id} not found")] FunctionCallNotFound { call_id: CallId }, + #[error("Future with ID {future_id:?} not found")] + FutureNotFound { future_id: FutureId }, + #[error("Function not found: {name}")] FunctionNotFound { name: String }, @@ -218,9 +192,6 @@ pub enum EngineError { #[error("Schema inconsistency: {message}")] SchemaInconsistency { message: String }, - #[error("Operation cancelled")] - Cancelled, - #[cfg(feature = "heap_debug")] #[error("Snapshot not possible for type: {type_name}")] CannotSnapshot { type_name: String }, @@ -259,6 +230,47 @@ fn format_unhandled_throw(value: &BexExternalValue, trace: &[bex_vm::StackFrame] out } +/// Fully-qualified name of the cancellation panic class. +pub const CANCELLED_PANIC_CLASS: &str = "baml.panics.Cancelled"; + +/// True iff `err` is an unhandled `baml.panics.Cancelled` panic. +/// +/// Centralizes the cancellation-classification logic that bridges (`bridge_cffi`, +/// `bridge_nodejs`, `bridge_python`, `bridge_wasm`) and `baml_lsp_server` need +/// for mapping `EngineError` → host-specific cancellation indicator. +pub fn is_cancelled_engine_error(err: &EngineError) -> bool { + matches!( + err, + EngineError::UnhandledThrow { value, .. } + if matches!( + value.as_ref(), + BexExternalValue::Instance { class_name, .. } + if class_name == CANCELLED_PANIC_CLASS + ) + ) +} + +/// Synthesize an `EngineError::UnhandledThrow` representing a cancellation. +/// +/// Used when the engine produces a cancellation outside an active VM (pre-call +/// fail-fast and post-completion "cancel wins" race). Mirrors the shape of a +/// `baml.panics.Cancelled` instance produced by the VM's `Await` opcode so +/// host bridges can detect both cases by inspecting `class_name`. +fn cancelled_unhandled_throw() -> EngineError { + let mut fields = indexmap::IndexMap::new(); + fields.insert( + "message".to_string(), + BexExternalValue::String("operation cancelled".to_string()), + ); + EngineError::UnhandledThrow { + value: Box::new(BexExternalValue::Instance { + class_name: CANCELLED_PANIC_CLASS.to_string(), + fields, + }), + trace: Vec::new(), + } +} + // ============================================================================ // BexEngine // ============================================================================ @@ -328,8 +340,12 @@ fn format_unhandled_throw(value: &BexExternalValue, trace: &[bex_vm::StackFrame] pub struct BexEngine { /// The unified heap (shared across all VM instances) heap: Arc, - /// Global variables pool - globals: GlobalPool, + /// Frozen global variables shared across every post-`$init` VM. + /// + /// Populated once during `$init` and immutable thereafter; cloning is a + /// cheap refcount bump (see `VmGlobals::Shared`). The VM rejects any + /// `StoreGlobal` against this view as a `VmInternalError`. + globals: Arc<[Value]>, /// Resolved function/class/enum names for lookup resolved_function_names: HashMap, /// Resolved class names for instance allocation (`IndexMap` preserves definition order) @@ -360,6 +376,8 @@ pub struct BexEngine { /// Map of active function calls by ID. active_calls: Mutex>, + + futures: FutureManager, } #[cfg(target_arch = "wasm32")] @@ -475,7 +493,9 @@ impl BexEngine { .into_iter() .map(|cv| cv.to_value(|idx| heap.compile_time_ptr(idx.into_raw()))) .collect(); - let mut globals = GlobalPool::from_vec(globals_vec); + // Mutable during `$init` so `StoreGlobal` can populate top-level let + // bindings; frozen into `Arc<[Value]>` once `$init` finishes (see below). + let mut globals_pool = GlobalPool::from_vec(globals_vec); #[cfg(not(target_arch = "wasm32"))] let park_requested = Arc::new(AtomicBool::new(false)); @@ -488,7 +508,7 @@ impl BexEngine { if let Some((init_ptr, _kind)) = resolved_function_names.get(init_name.as_str()) { let mut vm = BexVm::new( Arc::clone(&heap), - globals.clone(), + VmGlobals::Owned(globals_pool.clone()), resolved_class_names .iter() .map(|(k, v)| (k.clone(), *v)) @@ -506,7 +526,12 @@ impl BexEngine { Ok(VmExecState::Complete(_)) => { // Extract the (potentially mutated) global pool back // so StoreGlobal writes are visible to subsequent calls. - globals = vm.globals; + globals_pool = match vm.globals { + VmGlobals::Owned(pool) => pool, + VmGlobals::Shared(_) => { + unreachable!("$init VM constructed with Owned globals") + } + }; break; } Ok(VmExecState::Notify(_) | VmExecState::SpanNotify(_)) => { @@ -535,6 +560,12 @@ impl BexEngine { } } + // Freeze the now-populated globals into a shared `Arc<[Value]>`. Every + // post-`$init` VM cloned into a `call_function` invocation reads from + // this exact `Arc`. `StoreGlobal` against this shared view is rejected + // by the VM as `VmInternalError::StoreGlobalAfterInit`. + let globals: Arc<[Value]> = Arc::from(globals_pool.0); + // Build SysOpContext by pre-extracting LLM function metadata from the heap. // This avoids passing raw HeapPtrs to sys_ops. let llm_functions = Self::extract_llm_function_info(&resolved_function_names); @@ -544,6 +575,11 @@ impl BexEngine { let enum_definitions = Self::extract_enum_definitions(&resolved_enum_names); let heap_permit_manager = Arc::new(HeapPermitManager::new()); + // we just created the permit manager so this will never block + let futures_permit = futures::executor::block_on( + heap_permit_manager + .new_permit(FutureManagerInner::new(Tlab::new_empty(Arc::clone(&heap)))), + ); // Build a default RuntimeIo from the SysOps table with an empty context. // This is replaced per-call in execute_sys_op with a live context that @@ -581,6 +617,7 @@ impl BexEngine { #[cfg(not(target_arch = "wasm32"))] park_requested, active_calls: Mutex::new(HashMap::new()), + futures: FutureManager::new(SharedHeapPermit::new(futures_permit)), }) } @@ -712,6 +749,13 @@ impl BexEngine { &self.heap_permit_manager } + /// Number of currently `Pending` futures tracked by the engine. Useful + /// for telemetry and tests that verify the future manager cleans up + /// completed futures. + pub async fn active_future_count(&self) -> usize { + self.futures.active_future_count().await + } + /// Resolve a [`bex_external_types::Handle`] to its current [`HeapPtr`]. /// /// The permit parameter proves GC cannot run while the caller is using the @@ -818,15 +862,17 @@ impl BexEngine { copy_objects: bool, ) -> Result { // Fail fast if already cancelled — guarantees pre-cancelled tokens - // always produce Err(Cancelled) regardless of function contents. + // always produce a `baml.panics.Cancelled` panic regardless of + // function contents. if cancel.is_cancelled() { - return Err(EngineError::Cancelled); + return Err(cancelled_unhandled_throw()); } - // Create VM with shared heap (each VM gets its own TLAB) + // Create VM with shared heap (each VM gets its own TLAB). + // Globals are shared as a frozen `Arc<[Value]>` — cloning is a refcount bump. let vm = BexVm::new( Arc::clone(&self.heap), - self.globals.clone(), + VmGlobals::Shared(Arc::clone(&self.globals)), self.resolved_class_names .iter() .map(|(k, v)| (k.clone(), *v)) @@ -938,8 +984,10 @@ impl BexEngine { // active_calls cleanup is done by ActiveCallGuard on drop. // - // Keep genuine engine errors intact. Cancellation is surfaced directly - // by engine safepoints as `EngineError::Cancelled`. + // Keep genuine engine errors intact. Cancellation is surfaced as a + // `baml.panics.Cancelled` panic — either raised by the VM's `Await` + // opcode, or synthesized by engine safepoints (see + // `cancelled_unhandled_throw`). } /// Cancel a function call by its ID. @@ -949,8 +997,8 @@ impl BexEngine { /// is unknown, this will return an error. pub fn cancel_function_call(&self, call_id: CallId) -> Result<(), EngineError> { let mut active_calls = self.active_calls.lock().unwrap(); - if let Some(cancel) = active_calls.remove(&call_id) { - cancel.cancel(); + if let Some(call) = active_calls.remove(&call_id) { + call.cancel(); Ok(()) } else { Err(EngineError::FunctionCallNotFound { call_id }) @@ -1196,21 +1244,6 @@ impl BexEngine { } } - /// Engine-level cancellation safepoint. - /// - /// Keeps cancellation handling centralized in the engine loop instead of - /// requiring individual BAML code paths or `sys_ops` to be cancel-aware. - fn cancellation_safepoint( - cancel: &CancellationToken, - abort_handles: &AbortHandlesGuard, - ) -> Result<(), EngineError> { - if cancel.is_cancelled() { - abort_handles.abort_all(); - return Err(EngineError::Cancelled); - } - Ok(()) - } - /// Drive the VM to completion, dispatching sys-ops, awaits, span /// notifications, and early-yield events. /// @@ -1229,31 +1262,7 @@ impl BexEngine { cancel: &CancellationToken, copy_objects: bool, ) -> Result { - let (pending_futures, mut processed_futures) = mpsc::unbounded_channel::(); - // Abort handles for spawned async tasks. - // - // Cancellation design: the engine checks cancellation at centralized - // safepoints (VM loop boundaries + ScheduleFuture boundaries), and uses - // a biased `tokio::select!` while waiting at `Await`. This keeps - // cancellation in the engine, so individual sys_ops don't need to be - // cancellation-aware. Without abort handles, async sys-op tasks would - // continue as orphans after cancellation until they complete naturally. - // For long-running ops (HTTP requests, multi-second sleeps), that - // wastes real resources. - // - // Rather than making individual sys_ops cancel-aware (wrapping each in - // its own `tokio::select!`), we store abort handles here and kill all - // spawned tasks when cancellation fires. This keeps sys_op - // implementations simple — new sys_ops never need to think about - // cancellation. - // - // We use `futures::future::AbortHandle` (not `tokio::task::AbortHandle`) - // so the same mechanism works on both native and WASM targets. - let mut abort_handles = AbortHandlesGuard::new(); - - 'vm_exec: loop { - Self::cancellation_safepoint(cancel, &abort_handles)?; - + loop { // Update the VM's span context so native functions can read it. vm.current_span_context = span_state.as_ref().map(Self::build_span_context_from_state); @@ -1289,15 +1298,12 @@ impl BexEngine { match exec_result { VmExecState::Complete(value) => { // "Cancel wins" semantics: if cancellation races with a - // completed VM step, report `Cancelled` rather than - // returning a success value. + // completed VM step, report a cancellation panic rather + // than returning a success value. // // Still emit FunctionEnd first so tracing consumers see // a paired root FunctionStart/FunctionEnd span. let cancelled = cancel.is_cancelled(); - if cancelled { - abort_handles.abort_all(); - } // Emit FunctionEnd for the root entry-point span if tracing if let Some(state) = span_state.as_mut() { @@ -1328,7 +1334,7 @@ impl BexEngine { } if cancelled { - return Err(EngineError::Cancelled); + return Err(cancelled_unhandled_throw()); } // When copy_objects: false and the result is a heap object, @@ -1351,113 +1357,90 @@ impl BexEngine { ); } - VmExecState::ScheduleFuture(id) => { - let pending = vm - .pending_future(id) + VmExecState::ScheduleFuture(unscheduled) => { + // Extract data from unscheduled future + let unscheduled = vm + .unscheduled_future(unscheduled) .map_err(EngineError::VmInternalError)?; - - // Convert arguments to BexExternalValue - let args: Vec = pending + let args: Vec = unscheduled .args .iter() .map(|v| self.vm_arg_to_bex_value(v)) .collect(); + let operation = unscheduled.operation; + + // Setup future + // TODO: detached cancellation + let future_cancel = cancel.child_token(); + let mut future_permit = self.futures.acquire().await; + let (future_id, future_ptr) = future_permit.new_future(future_cancel.clone()); + vm.stack.push(Value::Object(future_ptr)); + if future_cancel.is_cancelled() { + // Early cancellation-- don't need to even start the future. + future_permit.cancel_future(future_id)?; + drop(future_permit); + continue; + } + drop(future_permit); - Self::cancellation_safepoint(cancel, &abort_handles)?; + // Execute sys_op let sys_op_result = - self.execute_sys_op(pending.operation, &args, call_id, cancel, vm.proof()); - Self::cancellation_safepoint(cancel, &abort_handles)?; + self.execute_sys_op(operation, &args, call_id, cancel, vm.proof()); match sys_op_result { SysOpResult::Ready(result) => { - // Guard the "commit to VM state" boundary. - Self::cancellation_safepoint(cancel, &abort_handles)?; - - // Sync operation - set future to Ready without touching stack. - // The VM will continue to the Await instruction which will - // extract the value from the Ready future. - let result = result.map_err(EngineError::from)?; - let value = self.convert_external_to_vm_value(&mut vm, result); - - vm.set_future_ready(id, value) - .map_err(EngineError::VmInternalError)?; + // We may be blocked by other threads doing futures stuff, + // but since we hold our VM permit it should never be blocked by GC. + let mut future_permit = self.futures.acquire().await; + // Cancellation safepoint: if cancellation fires between the + // pre-exec early check and the commit, surface it as a Cancelled + // future rather than fulfilling. The VM's next `Await` will throw + // `baml.panics.Cancelled`. + if cancel.is_cancelled() { + future_permit.cancel_future(future_id)?; + drop(future_permit); + continue; + } + match result { + Ok(external) => { + let value = self + .convert_external_to_vm_value(&mut future_permit, external); + future_permit.fulfill_future(future_id, value)?; + } + Err(op_err) => { + // Route sync sys-op errors through `internal_error_future` + // so the original error is preserved on the registry's + // `SetOnce`. The VM's subsequent `Await` yields back to + // this loop's `Await` branch, where `future_ready` returns + // the original error to the caller via `?`. + future_permit.internal_error_future( + future_id, + EngineError::from(op_err), + )?; + } + } + drop(future_permit); } SysOpResult::Async(fut) => { - // Guard the "spawn side effect" boundary. - Self::cancellation_safepoint(cancel, &abort_handles)?; - - // Async operation — wrap in Abortable and spawn. - let pending_futures = pending_futures.clone(); - let (abort_handle, abort_reg) = - futures::future::AbortHandle::new_pair(); - let abortable = futures::future::Abortable::new( - async move { - let result = fut.await; - let _ = pending_futures.send(FutureResult { - id, - result: result.map_err(EngineError::from), - }); - }, - abort_reg, - ); + let task = Arc::clone(self).run_future(fut, future_id, future_cancel); #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { - let _ = abortable.await; + let _ = task.await; }); #[cfg(target_arch = "wasm32")] wasm_bindgen_futures::spawn_local(async move { - let _ = abortable.await; + let _ = task.await; }); - abort_handles.push(abort_handle); } } } VmExecState::Await(future_id) => { - Self::cancellation_safepoint(cancel, &abort_handles)?; vm = self.gc_safepoint(vm).await; - - // First, drain any already-completed futures. - while let Ok(future) = processed_futures.try_recv() { - let external = future.result?; - let value = self.convert_external_to_vm_value(&mut vm, external); - vm.fulfil_future(future.id, value) - .map_err(EngineError::VmInternalError)?; - - if future.id == future_id { - continue 'vm_exec; - } - } - - // We gotta wait for the target future. - // Race against cancellation — `biased` ensures the cancel - // branch is checked first, matching legacy orchestrator behavior. - loop { - tokio::select! { - biased; - () = cancel.cancelled() => { - // Abort all in-flight spawned tasks to stop - // HTTP requests, sleeps, etc. immediately. - abort_handles.abort_all(); - return Err(EngineError::Cancelled); - } - future = processed_futures.recv() => { - let future = future - .ok_or(EngineError::FutureChannelClosed)?; - let external = future.result?; - let value = self.convert_external_to_vm_value( - &mut vm, - external, - ); - vm.fulfil_future(future.id, value) - .map_err(EngineError::VmInternalError)?; - - if future.id == future_id { - break; - } - } - } - } + let future_permit = self.futures.acquire().await; + let future = future_permit.future_ready(future_id)?; + drop(future_permit); + future.await?; } VmExecState::Event { @@ -1627,13 +1610,48 @@ impl BexEngine { } } VmExecState::EarlyYield => { - Self::cancellation_safepoint(cancel, &abort_handles)?; vm = self.gc_safepoint(vm).await; } } } } + /// Runs a future, handling cancellation. + /// When completed, updates the future state on the heap. + async fn run_future( + self: Arc, + future: OpFuture, + future_id: FutureId, + cancel: CancellationToken, + ) -> Result<(), EngineError> { + tokio::select! { + biased; + () = cancel.cancelled() => { + let mut future_guard = self.futures.acquire().await; + future_guard.cancel_future(future_id)?; + drop(future_guard); + Ok(()) + } + result = future => { + let mut future_guard = self.futures.acquire().await; + match result { + Ok(value) => { + let value = self.convert_external_to_vm_value( + &mut future_guard, + value, + ); + future_guard.fulfill_future(future_id, value)?; + } + Err(err) => { + future_guard.internal_error_future(future_id, err.into())?; + } + } + drop(future_guard); + Ok(()) + } + } + } + /// Build a `SpanContext` from the current `SpanState`. /// /// Returns the context for the innermost active span, or uses the root span diff --git a/baml_language/crates/bex_engine/tests/cancellation.rs b/baml_language/crates/bex_engine/tests/cancellation.rs index f579207c27..ea512c9894 100644 --- a/baml_language/crates/bex_engine/tests/cancellation.rs +++ b/baml_language/crates/bex_engine/tests/cancellation.rs @@ -9,11 +9,29 @@ mod common; use std::sync::Arc; use bex_engine::{ - BexEngine, BexExternalValue, CancellationToken, EngineError, FunctionCallContextBuilder, + BexEngine, BexExternalValue, CANCELLED_PANIC_CLASS, CancellationToken, EngineError, + FunctionCallContextBuilder, }; use common::compile_for_engine; use sys_native::SysOpsExt; +/// Asserts the result is an unhandled `baml.panics.Cancelled` panic. +#[track_caller] +fn assert_cancelled(result: &Result) { + match result { + Err(EngineError::UnhandledThrow { value, .. }) => match value.as_ref() { + BexExternalValue::Instance { class_name, .. } => { + assert_eq!( + class_name, CANCELLED_PANIC_CLASS, + "expected {CANCELLED_PANIC_CLASS} panic, got class {class_name}" + ); + } + other => panic!("expected panic Instance, got {other:?}"), + }, + other => panic!("expected UnhandledThrow({CANCELLED_PANIC_CLASS}), got {other:?}"), + } +} + // ============================================================================ // 1. Immediate cancellation — token already cancelled before call starts // ============================================================================ @@ -53,10 +71,7 @@ async fn cancel_before_call_returns_cancelled() { ) .await; - assert!( - matches!(result, Err(EngineError::Cancelled)), - "Expected EngineError::Cancelled, got: {result:?}" - ); + assert_cancelled(&result); } // ============================================================================ @@ -111,10 +126,7 @@ async fn cancel_during_sleep_returns_promptly() { let result = handle.await.expect("task panicked"); let elapsed = start.elapsed(); - assert!( - matches!(result, Err(EngineError::Cancelled)), - "Expected EngineError::Cancelled, got: {result:?}" - ); + assert_cancelled(&result); // Should return well before the 10s sleep completes. assert!( elapsed < std::time::Duration::from_secs(2), @@ -189,10 +201,7 @@ async fn cancel_during_http_returns_promptly() { let result = handle.await.expect("task panicked"); let elapsed = start.elapsed(); - assert!( - matches!(result, Err(EngineError::Cancelled)), - "Expected EngineError::Cancelled, got: {result:?}" - ); + assert_cancelled(&result); assert!( elapsed < std::time::Duration::from_secs(2), "Cancel took too long: {elapsed:?} (expected < 2s)" @@ -271,10 +280,7 @@ async fn selective_cancellation_only_affects_target() { let result_slow = handle_slow.await.expect("task panicked"); let result_fast = handle_fast.await.expect("task panicked"); - assert!( - matches!(result_slow, Err(EngineError::Cancelled)), - "Slow call should be cancelled, got: {result_slow:?}" - ); + assert_cancelled(&result_slow); assert_eq!( result_fast.expect("fast call failed"), BexExternalValue::Int(2), @@ -337,10 +343,7 @@ async fn cancel_interrupts_sequential_sleeps() { let result = handle.await.expect("task panicked"); let elapsed = start.elapsed(); - assert!( - matches!(result, Err(EngineError::Cancelled)), - "Expected EngineError::Cancelled, got: {result:?}" - ); + assert_cancelled(&result); assert!( elapsed < std::time::Duration::from_secs(3), "Cancel took too long: {elapsed:?} (expected < 3s)" @@ -433,5 +436,5 @@ async fn cancel_is_idempotent() { cancel.cancel(); // third cancel — still harmless let result = handle.await.expect("task panicked"); - assert!(matches!(result, Err(EngineError::Cancelled))); + assert_cancelled(&result); } diff --git a/baml_language/crates/bex_engine/tests/future_cleanup.rs b/baml_language/crates/bex_engine/tests/future_cleanup.rs new file mode 100644 index 0000000000..70a6e1492b --- /dev/null +++ b/baml_language/crates/bex_engine/tests/future_cleanup.rs @@ -0,0 +1,83 @@ +//! Verify that the engine's `FutureManager` removes entries for completed +//! futures so the tracking map does not grow unboundedly. + +mod common; + +use std::sync::Arc; + +use bex_engine::{BexEngine, BexExternalValue, FunctionCallContextBuilder}; +use common::compile_for_engine; +use sys_native::SysOpsExt; + +fn make_engine(source: &str) -> Arc { + let snapshot = compile_for_engine(source); + Arc::new( + BexEngine::new( + snapshot, + Arc::new(sys_native::SysOps::native()), + None, + Vec::new(), + ) + .expect("Failed to create engine"), + ) +} + +async fn call_main(engine: &Arc) -> BexExternalValue { + engine + .call_function( + "main", + vec![], + FunctionCallContextBuilder::new(sys_types::CallId::next()).build(), + true, + ) + .await + .expect("call should succeed") +} + +#[tokio::test] +async fn active_futures_drains_after_awaited_sleep() { + // A function that schedules and awaits several async sys-ops should + // leave no entries behind in the future manager once it returns. + let source = r#" + function main() -> int { + baml.sys.sleep(0); + baml.sys.sleep(0); + baml.sys.sleep(0); + 42 + } + "#; + + let engine = make_engine(source); + assert_eq!(engine.active_future_count().await, 0); + + let result = call_main(&engine).await; + assert_eq!(result, BexExternalValue::Int(42)); + assert_eq!(engine.active_future_count().await, 0); +} + +#[tokio::test] +async fn active_futures_drains_across_concurrent_calls() { + // Run several calls concurrently and verify the count is back to zero + // after they all complete. This catches a regression where one call's + // entries linger and bias the count. + let source = r#" + function main() -> int { + baml.sys.sleep(0); + 7 + } + "#; + + let engine = make_engine(source); + + let mut handles = Vec::new(); + for _ in 0..8 { + let engine = Arc::clone(&engine); + handles.push(tokio::spawn(async move { call_main(&engine).await })); + } + for handle in handles { + let value = handle.await.expect("task should not panic"); + assert_eq!(value, BexExternalValue::Int(7)); + } + + assert_eq!(engine.active_future_count().await, 0); +} diff --git a/baml_language/crates/bex_heap/src/accessor.rs b/baml_language/crates/bex_heap/src/accessor.rs index e2167bcc6a..5297ac8e36 100644 --- a/baml_language/crates/bex_heap/src/accessor.rs +++ b/baml_language/crates/bex_heap/src/accessor.rs @@ -10,7 +10,7 @@ use bex_vm_types::{HeapPtr, Object, Value}; use crate::{BexHeap, heap_guard::PermitProof}; -#[derive(Debug, PartialEq, thiserror::Error)] +#[derive(Debug, PartialEq, thiserror::Error, Clone)] pub enum AccessError { #[error("Invalid handle: expected {expected}")] InvalidHandle { expected: &'static str }, @@ -592,6 +592,7 @@ fn owned_inner( Object::Class(..) => unconvertible("class"), Object::Enum(..) => unconvertible("enum"), Object::Future(..) => unconvertible("future"), + Object::UnscheduledFuture(..) => unconvertible("unscheduled_future"), Object::String(s) => Ok(BexExternalValue::String(s.clone())), // Deep-copy path for trace payloads: no declared type is available here, diff --git a/baml_language/crates/bex_heap/src/gc.rs b/baml_language/crates/bex_heap/src/gc.rs index 3a7bbb2eea..242b2b7288 100644 --- a/baml_language/crates/bex_heap/src/gc.rs +++ b/baml_language/crates/bex_heap/src/gc.rs @@ -313,20 +313,26 @@ impl BexHeap { Object::Future(fut) => { use bex_vm_types::Future; match fut { - Future::Pending(pending) => { - for value in &pending.args { - if let Value::Object(ptr) = value { - worklist.push(*ptr); - } - } - } Future::Ready(value) => { if let Value::Object(ptr) = value { worklist.push(*ptr); } } + Future::Error(Value::Object(ptr)) => { + worklist.push(*ptr); + } + Future::Pending(_) + | Future::Error(_) + | Future::Cancelled + | Future::InternalError(_) => {} } } + Object::UnscheduledFuture(future) => { + worklist.extend(future.args.iter().filter_map(|v| match v { + Value::Object(ptr) => Some(*ptr), + _ => None, + })); + } // Primitives have no references #[cfg(feature = "heap_debug")] Object::Sentinel(_) => {} @@ -403,14 +409,15 @@ impl BexHeap { Object::Future(fut) => { use bex_vm_types::Future; match fut { - Future::Pending(pending) => { - for value in &mut pending.args { - self.fixup_value(value, forwarding); - } - } - Future::Ready(value) => { + Future::Ready(value) | Future::Error(value) => { self.fixup_value(value, forwarding); } + Future::Pending(_) | Future::Cancelled | Future::InternalError(_) => {} + } + } + Object::UnscheduledFuture(future) => { + for value in &mut future.args { + self.fixup_value(value, forwarding); } } // Primitives have no references @@ -652,24 +659,25 @@ impl BexHeap { Object::Future(fut) => { use bex_vm_types::Future; match fut { - Future::Pending(pending) => { - worklist.extend( - pending - .args - .iter() - .filter_map(Value::as_object_ptr) - .filter(|ptr| self.generation_of(*ptr).is_young()), - ); - } - Future::Ready(value) => { + Future::Ready(value) | Future::Error(value) => { if let Value::Object(ptr) = value && self.generation_of(*ptr).is_young() { worklist.push(*ptr); } } + Future::Pending(_) | Future::Cancelled | Future::InternalError(_) => {} } } + Object::UnscheduledFuture(future) => { + worklist.extend( + future + .args + .iter() + .filter_map(Value::as_object_ptr) + .filter(|ptr| self.generation_of(*ptr).is_young()), + ); + } // Primitives/leaf variants have no heap references. #[cfg(feature = "heap_debug")] Object::Sentinel(_) => {} @@ -1755,17 +1763,17 @@ mod tests { #[test] fn test_gc_traces_future_pending_args() { - use bex_vm_types::{Future, PendingFuture, SysOp}; + use bex_vm_types::{SysOp, UnscheduledFuture}; let heap = BexHeap::new(vec![]); let mut tlab = Tlab::new(Arc::clone(&heap)); let arg1 = tlab.alloc_string("arg1".to_string()); let arg2 = tlab.alloc_string("arg2".to_string()); - let future_ptr = tlab.alloc(Object::Future(Future::Pending(PendingFuture { + let future_ptr = tlab.alloc(Object::UnscheduledFuture(UnscheduledFuture { operation: SysOp::BamlEnvGet, args: vec![Value::Object(arg1), Value::Object(arg2)], - }))); + })); let roots = vec![future_ptr]; let (stats, _new_roots, _) = unsafe { heap.collect_garbage(&roots) }; @@ -2213,7 +2221,7 @@ mod tests { fn test_tracing_and_fixup_consistency_all_variants() { use baml_type::{Name, TyAttr, TypeName}; use bex_vm_types::{ - Class, Enum, Future, PendingFuture, SysOp, + Class, Enum, SysOp, UnscheduledFuture, types::{Cell, Closure, Instance, Variant}, }; @@ -2250,11 +2258,11 @@ mod tests { value: Value::Object(leaf_for_cell), })); - // --- Container: Object::Future (Pending) --- - let future_container = tlab.alloc(Object::Future(Future::Pending(PendingFuture { + // --- Container: Object::UnscheduledFuture --- + let future_container = tlab.alloc(Object::UnscheduledFuture(UnscheduledFuture { operation: SysOp::BamlEnvGet, args: vec![Value::Object(leaf_for_future)], - }))); + })); // --- Container: Object::Instance --- // Instance requires a class pointer. @@ -2361,8 +2369,8 @@ mod tests { assert_eq!(s, "cell_value"); // Future: args[0] should be the (forwarded) future arg string. - let Object::Future(Future::Pending(pending)) = (unsafe { new_roots[4].get() }) else { - panic!("new_roots[4] not Future::Pending") + let Object::UnscheduledFuture(pending) = (unsafe { new_roots[4].get() }) else { + panic!("new_roots[4] not UnscheduledFuture") }; let Value::Object(fut_leaf) = pending.args[0] else { panic!("future.args[0] not Object") diff --git a/baml_language/crates/bex_heap/src/heap.rs b/baml_language/crates/bex_heap/src/heap.rs index 88e59c6efe..d45e7c1766 100644 --- a/baml_language/crates/bex_heap/src/heap.rs +++ b/baml_language/crates/bex_heap/src/heap.rs @@ -21,6 +21,7 @@ use std::{ }, }; +use ::bex_vm_types::Value; use bex_external_types::{Handle, WeakHeapRef}; use bex_vm_types::{HeapPtr, Object, ObjectIndex}; @@ -462,6 +463,42 @@ impl BexHeap { unsafe { &mut *self.inactive.get() } } + /// Write barrier for field/element/cell writes. + /// + /// Called *before* the actual field write at each mutation site. If `container_ptr` + /// is in an older generation than the object being written (`written_value`), the + /// card containing `container_ptr` is marked dirty so partial GC can discover + /// the cross-generation reference. + /// + /// This is a no-op when either side is not a heap object, or when the container + /// is in Gen0 (no card table for Gen0). + #[inline] + pub fn write_barrier(&self, container_ptr: HeapPtr, written_value: Value) { + if let Value::Object(ref_ptr) = written_value { + let container_gen = self.generation_of(container_ptr); + let ref_gen = self.generation_of(ref_ptr); + if container_gen > ref_gen { + self.mark_card_for_ptr(container_ptr); + } + } + } + + /// Conservative write barrier for mutable accessor paths (builtin dispatch). + /// + /// Unconditionally marks the card dirty if `container_ptr` is in an older + /// generation. Used by `as_array_mut` / `as_map_mut` where the actual written + /// value is not yet known (it's supplied by the callee trait method). + /// + /// This over-marks (any mutable access to an older-gen object dirties the card), + /// but it is always safe and the cost is negligible since most objects are Gen0. + #[inline] + pub fn conservative_write_barrier(&self, container_ptr: HeapPtr) { + let container_gen = self.generation_of(container_ptr); + if container_gen > Generation::Gen0 { + self.mark_card_for_ptr(container_ptr); + } + } + /// Determine which generation an object pointer belongs to. /// /// Only compile-time, Gen2, and Gen1 are checked directly; anything else is diff --git a/baml_language/crates/bex_heap/src/heap_debugger/real.rs b/baml_language/crates/bex_heap/src/heap_debugger/real.rs index b8f501cbfb..1ae9a7e339 100644 --- a/baml_language/crates/bex_heap/src/heap_debugger/real.rs +++ b/baml_language/crates/bex_heap/src/heap_debugger/real.rs @@ -332,15 +332,16 @@ impl BexHeap { ); } Object::Future(fut) => match fut { - Future::Pending(pending) => { - for value in &pending.args { - self.debug_assert_valid_value(value); - } - } - Future::Ready(value) => { + Future::Ready(value) | Future::Error(value) => { self.debug_assert_valid_value(value); } + Future::Pending(_) | Future::Cancelled | Future::InternalError(_) => {} }, + Object::UnscheduledFuture(future) => { + for value in &future.args { + self.debug_assert_valid_value(value); + } + } Object::Closure(closure) => { self.debug_assert_valid_index(closure.function); for value in &closure.captures { diff --git a/baml_language/crates/bex_heap/src/heap_guard.rs b/baml_language/crates/bex_heap/src/heap_guard.rs index 64ea9a499f..aa2eeb9bb0 100644 --- a/baml_language/crates/bex_heap/src/heap_guard.rs +++ b/baml_language/crates/bex_heap/src/heap_guard.rs @@ -15,6 +15,7 @@ use ::std::{ collections::HashMap, sync::{Arc, Weak}, }; +use ::tokio::sync::Mutex; /// The lesser of [`u32::MAX`] and [`tokio::sync::Semaphore::MAX_PERMITS`] (depends on compilation target pointer width). const MAX_PERMITS: u32 = { @@ -32,6 +33,24 @@ const MAX_PERMITS: u32 = { } }; +pub trait HeapPermit { + /// Get a reference to the root holder (for example, the active VM) + /// + /// Callers can also use [`Deref`] which will return the same value. + fn holder(&self) -> &T; + /// Get a mutable reference to the root holder (for example, the active VM) + /// + /// Callers can also use [`DerefMut`] which will return the same value. + fn holder_mut(&mut self) -> &mut T; + /// Get a type-erased [`PermitProof`] tied to this active permit's lifetime. + /// + /// This lets the GC-exclusion proof flow through APIs (e.g. the sys-op + /// dispatch glue) that cannot name the concrete `T`. The returned proof + /// is `Copy`, `Send`, and `Sync` and carries no runtime data — the + /// guarantee comes purely from the lifetime, which cannot outlive `self`. + fn proof(&self) -> PermitProof<'_>; +} + /// An active heap permit. /// /// Provides non-exclusive access to the heap for the contained [`RootHaver`]. @@ -71,33 +90,17 @@ impl ActiveHeapPermit { pub async fn renew(self) -> Self { self.release().acquire().await } - - /// Get a reference to the root holder (for example, the active VM) - /// - /// Callers can also use [`Deref`] which will return the same value. - #[inline] - pub fn holder(&self) -> &T { +} +impl HeapPermit for ActiveHeapPermit { + fn holder(&self) -> &T { // SAFETY: we have a permit to access the heap so we can access the root holder. unsafe { self.state.holder() } } - - /// Get a mutable reference to the root holder (for example, the active VM) - /// - /// Callers can also use [`DerefMut`] which will return the same value. - #[inline] - pub fn holder_mut(&mut self) -> &mut T { + fn holder_mut(&mut self) -> &mut T { // SAFETY: we have a permit to access the heap so we can access the root holder. unsafe { self.state.holder_mut() } } - - /// Get a type-erased [`PermitProof`] tied to this active permit's lifetime. - /// - /// This lets the GC-exclusion proof flow through APIs (e.g. the sys-op - /// dispatch glue) that cannot name the concrete `T`. The returned proof - /// is `Copy`, `Send`, and `Sync` and carries no runtime data — the - /// guarantee comes purely from the lifetime, which cannot outlive `self`. - #[inline] - pub fn proof(&self) -> PermitProof<'_> { + fn proof(&self) -> PermitProof<'_> { PermitProof { _marker: PhantomData, } @@ -174,6 +177,76 @@ impl InactiveHeapPermit { } } +/// For use when multiple threads need to share a single permit. +/// +/// Ensures only one thread can use the permit at a time, +/// and yields to exclusive access whenever the permit is released. +pub struct SharedHeapPermit { + inner: Mutex>, +} +impl SharedHeapPermit { + pub fn new(inner: InactiveHeapPermit) -> Self { + Self { + inner: Mutex::new(inner), + } + } + pub async fn acquire(&self) -> SharedHeapPermitGuard<'_, T> { + let state = self.inner.lock().await; + let permit = Arc::clone(&state.active) + .acquire_owned() + .await + .unwrap_or_else(|_| unreachable!("Semaphore should never be closed")); + SharedHeapPermitGuard { + state, + _permit: permit, + _marker: PhantomData, + } + } +} + +pub struct SharedHeapPermitGuard<'a, T: RootHaver> { + state: tokio::sync::MutexGuard<'a, InactiveHeapPermit>, + _permit: tokio::sync::OwnedSemaphorePermit, + /// Ties the auto `Send`/`Sync` of `ActiveHeapPermit` to `T`. + /// + /// Without this marker, every field of this struct is unconditionally + /// `Sync` (notably because [`PermitCell`] has a manual unconditional + /// `unsafe impl Sync` — which is itself load-bearing, so that + /// `Weak>` can live in the manager's shared + /// `Mutex>`). That would let the compiler auto-derive + /// `ActiveHeapPermit: Sync` even when `T: !Sync`, and two threads + /// sharing `&ActiveHeapPermit` could each call [`Self::holder`] to + /// observe `&T` at the same time — UB when `T: !Sync`. + _marker: PhantomData, +} +impl<'a, T: RootHaver> HeapPermit for SharedHeapPermitGuard<'a, T> { + fn holder(&self) -> &T { + // SAFETY: we have a permit to access the heap so we can access the root holder. + unsafe { self.state.holder() } + } + fn holder_mut(&mut self) -> &mut T { + // SAFETY: we have a permit to access the heap so we can access the root holder. + unsafe { self.state.holder_mut() } + } + fn proof(&self) -> PermitProof<'_> { + PermitProof { + _marker: PhantomData, + } + } +} +impl<'a, T: RootHaver> Deref for SharedHeapPermitGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.holder() + } +} +impl<'a, T: RootHaver> DerefMut for SharedHeapPermitGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.holder_mut() + } +} + /// The central heap coordination system. /// /// At any given time, there can be either: diff --git a/baml_language/crates/bex_heap/src/lib.rs b/baml_language/crates/bex_heap/src/lib.rs index d92ecbfe23..0f1fd8ddaa 100644 --- a/baml_language/crates/bex_heap/src/lib.rs +++ b/baml_language/crates/bex_heap/src/lib.rs @@ -74,6 +74,7 @@ pub use gc::{CollectionLevel, GcStats}; pub use heap::{BexHeap, DEFAULT_TLAB_SIZE, Generation, HeapStats}; pub(crate) use heap_debugger::{HeapDebuggerConfig, HeapDebuggerState}; pub use heap_guard::{ - ActiveHeapPermit, HeapGuard, HeapPermitManager, InactiveHeapPermit, PermitProof, + ActiveHeapPermit, HeapGuard, HeapPermit, HeapPermitManager, InactiveHeapPermit, PermitProof, + SharedHeapPermit, SharedHeapPermitGuard, }; -pub use tlab::Tlab; +pub use tlab::{Tlab, TlabHolder}; diff --git a/baml_language/crates/bex_heap/src/tlab.rs b/baml_language/crates/bex_heap/src/tlab.rs index 0a7327e102..557ee0db56 100644 --- a/baml_language/crates/bex_heap/src/tlab.rs +++ b/baml_language/crates/bex_heap/src/tlab.rs @@ -171,6 +171,36 @@ impl Tlab { self.alloc(Object::Variant(Variant { enm, index })) } + /// Allocate a uint8 array object. + #[inline] + pub fn alloc_uint8array(&mut self, data: Vec) -> HeapPtr { + self.alloc(Object::Uint8Array(data)) + } + + /// Allocate opaque Rust data on the heap. + #[inline] + pub fn alloc_rust_data(&mut self, data: Arc) -> HeapPtr { + self.alloc(Object::RustData(data)) + } + + /// Allocate a collector object on the heap. + #[inline] + pub fn alloc_collector(&mut self, collector: bex_vm_types::CollectorRef) -> HeapPtr { + self.alloc(Object::Collector(collector)) + } + + /// Allocate a type descriptor object on the heap. + #[inline] + pub fn alloc_type(&mut self, ty: baml_type::Ty) -> HeapPtr { + self.alloc(Object::Type(Box::new(ty))) + } + + /// Allocate a future object on the heap. + #[inline] + pub fn alloc_future(&mut self, future: bex_vm_types::Future) -> HeapPtr { + self.alloc(Object::Future(future)) + } + /// Get a new chunk from the heap (cold path). #[cold] fn refill(&mut self) { @@ -248,6 +278,11 @@ impl std::fmt::Debug for Tlab { } } +pub trait TlabHolder { + fn tlab(&self) -> &Tlab; + fn tlab_mut(&mut self) -> &mut Tlab; +} + #[cfg(test)] mod tests { use super::*; diff --git a/baml_language/crates/bex_project/src/bex.rs b/baml_language/crates/bex_project/src/bex.rs index 8cfbed03f4..cd7a8b7ddf 100644 --- a/baml_language/crates/bex_project/src/bex.rs +++ b/baml_language/crates/bex_project/src/bex.rs @@ -1,3 +1,4 @@ +use ::bex_heap::HeapPermit as _; use ::std::sync::Arc; use async_trait::async_trait; use baml_type::Ty; diff --git a/baml_language/crates/bex_project/src/lib.rs b/baml_language/crates/bex_project/src/lib.rs index b5aad585de..b36e63fd3f 100644 --- a/baml_language/crates/bex_project/src/lib.rs +++ b/baml_language/crates/bex_project/src/lib.rs @@ -11,7 +11,9 @@ use std::{collections::HashMap, sync::Arc}; pub use baml_builtins2::{MediaContent, MediaValue, PromptAst, PromptAstSimple}; pub use bex::Bex; -pub use bex_engine::{EngineError, FunctionCallContextBuilder}; +pub use bex_engine::{ + CANCELLED_PANIC_CLASS, EngineError, FunctionCallContextBuilder, is_cancelled_engine_error, +}; pub use bex_events::EventSink; pub use bex_external_types::{ BexExternalAdt, BexExternalValue, Handle, MediaKind, Ty, TyAttr, try_convert_rust_data, @@ -58,6 +60,14 @@ pub enum RuntimeError { Access(#[from] bex_heap::AccessError), } +/// True iff `err` wraps an engine cancellation panic. +/// +/// Centralizes the cancellation-classification logic that bridges and the +/// LSP server need to distinguish cancellation from other runtime errors. +pub fn is_cancelled_runtime_error(err: &RuntimeError) -> bool { + matches!(err, RuntimeError::Engine(e) if is_cancelled_engine_error(e)) +} + /// Keep pass-by-value so the returned `Arc` does not capture caller locals; /// taking `&VfsPath` / `&HashMap` would require returning a value that references them. #[allow(clippy::needless_pass_by_value)] diff --git a/baml_language/crates/bex_vm/src/debug.rs b/baml_language/crates/bex_vm/src/debug.rs index 9a4ca7910b..33d7f97229 100644 --- a/baml_language/crates/bex_vm/src/debug.rs +++ b/baml_language/crates/bex_vm/src/debug.rs @@ -30,7 +30,7 @@ use std::fmt::Write; use bex_vm_types::{ HeapPtr, bytecode::Instruction, - indexable::{GlobalIndex, GlobalPool, ObjectPool}, + indexable::{GlobalIndex, ObjectPool}, types::{Function, Object, Value}, }; use colored::{Color, Colorize}; @@ -50,13 +50,13 @@ pub enum BytecodeFormat { /// Resolve a global reference (global slot or callee slot) to display metadata. fn display_global_ref( index: GlobalIndex, - globals: &GlobalPool, + globals: &[Value], objects: Option<&ObjectPool>, compile_time_globals: Option<&[bex_vm_types::ConstValue]>, ) -> String { // Prefer runtime globals. if index.raw() < globals.len() { - return format!("({})", display_value(&globals[index])); + return format!("({})", display_value(&globals[index.raw()])); } // At compile time, resolve from compile-time globals/object pool. @@ -113,7 +113,7 @@ fn sanitize_operand_text(text: &str) -> String { pub(crate) fn display_instruction( instruction_ptr: usize, function: &Function, - globals: &GlobalPool, + globals: &[Value], objects: Option<&ObjectPool>, compile_time_globals: Option<&[bex_vm_types::ConstValue]>, ) -> (String, String) { @@ -409,7 +409,7 @@ impl Col { /// symmetric and returns the entire table. pub fn display_bytecode( function: &Function, - globals: &GlobalPool, + globals: &[Value], objects: Option<&ObjectPool>, compile_time_globals: Option<&[bex_vm_types::ConstValue]>, use_colors: bool, diff --git a/baml_language/crates/bex_vm/src/errors.rs b/baml_language/crates/bex_vm/src/errors.rs index a5b99af209..59045b1a8b 100644 --- a/baml_language/crates/bex_vm/src/errors.rs +++ b/baml_language/crates/bex_vm/src/errors.rs @@ -26,6 +26,9 @@ pub enum VmPanic { #[error("unreachable code executed")] Unreachable, + #[error("operation cancelled")] + Cancelled, + /// A user-caused panic from `baml.sys.panic`. #[error("baml.sys.panic: {message}")] UserPanic { message: String }, @@ -126,6 +129,12 @@ pub enum VmInternalError { #[error("Invalid manual notify")] InvalidManualNotify, + + /// `StoreGlobal` was executed outside of an `$init` function. Globals are + /// frozen post-`$init` (shared as `Arc<[Value]>` across VMs) and any + /// post-init `StoreGlobal` violates that invariant. + #[error("StoreGlobal executed outside of $init (globals are frozen post-init)")] + StoreGlobalAfterInit, } /// Any kind of virtual machine error. diff --git a/baml_language/crates/bex_vm/src/package_baml/root.rs b/baml_language/crates/bex_vm/src/package_baml/root.rs index 1f62f74769..01a12ce2f4 100644 --- a/baml_language/crates/bex_vm/src/package_baml/root.rs +++ b/baml_language/crates/bex_vm/src/package_baml/root.rs @@ -96,6 +96,7 @@ fn deep_copy_value_recursive( Object::Variant(v) => vm.tlab.alloc(Object::Variant(v)), Object::RustData(arc) => vm.tlab.alloc(Object::RustData(Arc::clone(&arc))), Object::Future(f) => vm.tlab.alloc(Object::Future(f)), + Object::UnscheduledFuture(f) => vm.tlab.alloc(Object::UnscheduledFuture(f)), Object::Collector(c) => vm.tlab.alloc(Object::Collector(c)), Object::Type(ty) => vm.tlab.alloc(Object::Type(ty)), // Closures, bound methods, and cells are shallow-copied: the captured @@ -205,18 +206,20 @@ fn deep_equals_recursive( (Future::Ready(a_val), Future::Ready(b_val)) => { deep_equals_recursive(vm, *a_val, *b_val, visited) } - (Future::Pending(a_pend), Future::Pending(b_pend)) => { - a_pend.operation == b_pend.operation - && a_pend.args.len() == b_pend.args.len() - && a_pend - .args - .iter() - .zip(b_pend.args.iter()) - .all(|(a, b)| deep_equals_recursive(vm, *a, *b, visited)) - } + (Future::Pending(a_id), Future::Pending(b_id)) => a_id == b_id, _ => false, }, + (Object::UnscheduledFuture(a_fut), Object::UnscheduledFuture(b_fut)) => { + a_fut.operation == b_fut.operation + && a_fut.args.len() == b_fut.args.len() + && a_fut + .args + .iter() + .zip(b_fut.args.iter()) + .all(|(a, b)| deep_equals_recursive(vm, *a, *b, visited)) + } + _ => false, }; diff --git a/baml_language/crates/bex_vm/src/package_baml/unstable.rs b/baml_language/crates/bex_vm/src/package_baml/unstable.rs index 8890698e8e..c2b6452e18 100644 --- a/baml_language/crates/bex_vm/src/package_baml/unstable.rs +++ b/baml_language/crates/bex_vm/src/package_baml/unstable.rs @@ -126,6 +126,7 @@ fn format_value_recursive( Object::Function(f) => Ok(format!("", f.name)), Object::Class(c) => Ok(format!("", c.name)), Object::Future(_) => Ok("".to_string()), + Object::UnscheduledFuture(_) => Ok("".to_string()), Object::Collector(_) => Ok("".to_string()), Object::Type(ty) => Ok(format!("")), Object::Uint8Array(bytes) => Ok(format!("", bytes.len())), diff --git a/baml_language/crates/bex_vm/src/vm.rs b/baml_language/crates/bex_vm/src/vm.rs index 17e1a16ce3..9e7e618ed4 100644 --- a/baml_language/crates/bex_vm/src/vm.rs +++ b/baml_language/crates/bex_vm/src/vm.rs @@ -16,18 +16,22 @@ use std::{collections::HashMap, sync::Arc}; -use ::bex_vm_types::{EarlyYieldCheck, RootHaver, types::ErrorClass}; +use ::bex_heap::TlabHolder; +use ::bex_vm_types::{ + EarlyYieldCheck, RootHaver, + types::{ErrorClass, FutureId}, +}; use ::core::any::TypeId; #[cfg(not(target_arch = "wasm32"))] use ::core::sync::atomic::AtomicBool; -use bex_heap::{BexHeap, Generation, Tlab}; +use bex_heap::{BexHeap, Tlab}; use bex_vm_types::{ - BinOp, CmpOp, FunctionKind, GlobalPool, HeapPtr, Instruction, Object, ObjectIndex, ObjectPool, - ObjectType, PanicClass, StackIndex, UnaryOp, Value, Variant, + BinOp, CmpOp, FunctionKind, HeapPtr, Instruction, Object, ObjectIndex, ObjectPool, ObjectType, + PanicClass, StackIndex, UnaryOp, Value, Variant, VmGlobals, bytecode::{self, BlockNotification}, types::{ - BoundMethod, Cell, Closure, Function, FunctionType, Future, FutureType, Instance, - PendingFuture, Type, + BoundMethod, Cell, Closure, Function, FunctionType, Future, FutureType, Instance, Type, + UnscheduledFuture, }, }; use indexmap::IndexMap; @@ -247,7 +251,13 @@ pub struct BexVm { /// Global variables. /// /// This stores the functions and globally declared variables. - pub globals: GlobalPool, + /// + /// During `$init`, this is `VmGlobals::Owned` so `StoreGlobal` can + /// populate top-level let bindings. After `$init` completes, the engine + /// freezes the pool into a shared `Arc<[Value]>` and every subsequent VM + /// is constructed with `VmGlobals::Shared`; `StoreGlobal` against the + /// shared view is a `VmInternalError`. + pub globals: VmGlobals, /// Resolved class names mapping fully-qualified class names to their heap pointers. /// @@ -298,11 +308,19 @@ pub struct BexVm { #[allow(clippy::large_enum_variant)] #[derive(Debug, PartialEq)] pub enum VmExecState { - /// VM cannot proceed. It is awaiting a pending future to complete. - Await(HeapPtr), + /// Awaiting a pending future. + /// + /// - Input: a `FutureId` corresponding to a (probably) pending future + /// - Output (success): the future's result on top of the stack + /// - Output (failure): an exception/panic passed to the VM + /// - Output (internal error): engine error + Await(FutureId), /// VM notifies caller about a future that needs to be scheduled. /// + /// - Input: an `UnscheduledFuture` object + /// - Output: a `Future` object (may be pending or ready) pushed to the top of the stack + /// /// Bytecode execution continues when control flow is handled back to the /// VM. ScheduleFuture(HeapPtr), @@ -482,6 +500,7 @@ fn value_type_tag(value: &Value) -> i64 { Object::BoundMethod(_) => type_tags::FUNCTION, Object::Cell(_) => type_tags::UNKNOWN, Object::Future(_) => type_tags::FUTURE, + Object::UnscheduledFuture(_) => type_tags::FUTURE, Object::Enum(_) => type_tags::ENUM, Object::RustData(_) => type_tags::UNKNOWN, Object::Collector(_) => type_tags::COLLECTOR, @@ -508,7 +527,7 @@ impl BexVm { /// for contention-free allocation. pub fn new( heap: Arc, - globals: GlobalPool, + globals: VmGlobals, resolved_class_names: HashMap, #[cfg(not(target_arch = "wasm32"))] park_requested: Arc, argv: Arc<[String]>, @@ -592,42 +611,6 @@ impl BexVm { unsafe { ptr.get_mut() } } - /// Write barrier for field/element/cell writes. - /// - /// Called *before* the actual field write at each mutation site. If `container_ptr` - /// is in an older generation than the object being written (`written_value`), the - /// card containing `container_ptr` is marked dirty so partial GC can discover - /// the cross-generation reference. - /// - /// This is a no-op when either side is not a heap object, or when the container - /// is in Gen0 (no card table for Gen0). - #[inline] - pub fn write_barrier(&self, container_ptr: HeapPtr, written_value: Value) { - if let Value::Object(ref_ptr) = written_value { - let container_gen = self.heap.generation_of(container_ptr); - let ref_gen = self.heap.generation_of(ref_ptr); - if container_gen > ref_gen { - self.heap.mark_card_for_ptr(container_ptr); - } - } - } - - /// Conservative write barrier for mutable accessor paths (builtin dispatch). - /// - /// Unconditionally marks the card dirty if `container_ptr` is in an older - /// generation. Used by `as_array_mut` / `as_map_mut` where the actual written - /// value is not yet known (it's supplied by the callee trait method). - /// - /// This over-marks (any mutable access to an older-gen object dirties the card), - /// but it is always safe and the cost is negligible since most objects are Gen0. - #[inline] - pub(crate) fn conservative_write_barrier(&self, container_ptr: HeapPtr) { - let container_gen = self.heap.generation_of(container_ptr); - if container_gen > Generation::Gen0 { - self.heap.mark_card_for_ptr(container_ptr); - } - } - /// Collect all `HeapPtr`s stored in call frames. /// - For bytecode frames, the function pointer /// - For native frames, the continuation pointer as well as all native-held GC roots @@ -750,7 +733,7 @@ impl BexVm { // array may introduce cross-generation references. Used by builtin dispatch // (Array.push, Array.pop, etc.) where the actual written values are not // visible at this call site. - self.conservative_write_barrier(ptr); + self.heap.conservative_write_barrier(ptr); // Check type first to avoid borrow issues if !matches!(self.get_object(ptr), Object::Array(_)) { return Err(VmInternalError::TypeError { @@ -786,7 +769,7 @@ impl BexVm { // Conservative write barrier: any mutable access to an older-generation // map may introduce cross-generation references. Used by builtin dispatch // (Map.set, etc.) where the actual written values are not visible here. - self.conservative_write_barrier(index); + self.heap.conservative_write_barrier(index); // Check type first to avoid borrow issues if !matches!(self.get_object(index), Object::Map(_)) { return Err(VmInternalError::TypeError { @@ -830,13 +813,16 @@ impl BexVm { // Create heap with compile-time objects let heap = BexHeap::new(compile_time_objects); - // Convert compile-time globals (ConstValue) to runtime globals (Value) + // Convert compile-time globals (ConstValue) to runtime globals (Value). + // The `from_program` constructor is test-only — we hand the VM an + // `Owned` view so that any `$init` bytecode the test happens to drive + // can write to globals. let globals_vec: Vec = bytecode .globals .into_iter() .map(|cv| cv.to_value(|idx| heap.compile_time_ptr(idx.into_raw()))) .collect(); - let globals = GlobalPool::from_vec(globals_vec); + let globals = VmGlobals::Owned(bex_vm_types::GlobalPool::from_vec(globals_vec)); // Build resolved_class_names: convert ObjectIndex -> HeapPtr let resolved_class_names: HashMap = bytecode @@ -887,71 +873,23 @@ impl BexVm { self.frames.clear(); } - /// Returns a reference to the pending future. + /// Returns a reference to the unscheduled future at `future_ptr`. /// - /// Returns [`VmInternalError::TypeError`] if the future is not pending, or not a future. - pub fn pending_future(&self, future_ptr: HeapPtr) -> Result<&PendingFuture, VmInternalError> { + /// Returns [`VmInternalError::TypeError`] if the heap object is not an + /// `Object::UnscheduledFuture`. + pub fn unscheduled_future( + &self, + future_ptr: HeapPtr, + ) -> Result<&UnscheduledFuture, VmInternalError> { match self.get_object(future_ptr) { - Object::Future(Future::Pending(future)) => Ok(future), + Object::UnscheduledFuture(future) => Ok(future), other => Err(VmInternalError::TypeError { - expected: FutureType::Pending.into(), + expected: Type::Object(ObjectType::UnscheduledFuture), got: ObjectType::of(other).into(), }), } } - /// Set a future to Ready state without modifying the stack. - /// - /// Use this for sync operations that complete during `ScheduleFuture` handling, - /// before the VM reaches the `Await` instruction. The `Await` instruction will - /// extract the value from the Ready future. - pub fn set_future_ready( - &mut self, - future_ptr: HeapPtr, - value: Value, - ) -> Result<(), VmInternalError> { - // Write barrier before mutating the future object. - self.write_barrier(future_ptr, value); - - let Object::Future(future) = self.get_object_mut(future_ptr) else { - return Err(VmInternalError::TypeError { - expected: FutureType::Any.into(), - got: ObjectType::of(self.get_object(future_ptr)).into(), - }); - }; - - *future = Future::Ready(value); - Ok(()) - } - - /// Fulfill a future and replace the stack top if the VM is awaiting it. - /// - /// Use this for async operations that complete while the VM is blocked at - /// an `Await` instruction. This replaces the future on the stack with the - /// ready value so execution can continue. - pub fn fulfil_future( - &mut self, - future_ptr: HeapPtr, - value: Value, - ) -> Result<(), VmInternalError> { - self.set_future_ready(future_ptr, value)?; - - // At any given moment, the VM can only await a single future, because - // we can only call the AWAIT instruction on a future on top of the - // stack. If that future being await is fulfilled, we need to replace - // the future on the stack with the ready value so that the next - // instruction that the VM runs can use the value, not the future - // object. - if let Some(Value::Object(ptr)) = self.stack.last() { - if *ptr == future_ptr { - self.stack.pop(); - self.stack.push(value); - } - } - - Ok(()) - } - /// Allocates an array on the heap and returns it to the caller. pub fn alloc_array(&mut self, values: Vec) -> Value { Value::Object(self.tlab.alloc(Object::Array(values))) @@ -966,7 +904,7 @@ impl BexVm { } pub fn alloc_uint8array(&mut self, data: Vec) -> Value { - Value::Object(self.tlab.alloc(Object::Uint8Array(data))) + Value::Object(self.tlab.alloc_uint8array(data)) } /// TODO: Seems to low level for an embedder, provide an API that takes @@ -990,7 +928,7 @@ impl BexVm { /// Allocate a collector object on the heap. pub fn alloc_collector(&mut self, collector: bex_vm_types::CollectorRef) -> Value { - Value::Object(self.tlab.alloc(Object::Collector(collector))) + Value::Object(self.tlab.alloc_collector(collector)) } /// Get collector ref from a Value. @@ -1013,7 +951,7 @@ impl BexVm { /// /// Used by generated `copy::` structs for `$rust_type` fields. pub fn alloc_rust_data(&mut self, data: Arc) -> Value { - Value::Object(self.tlab.alloc(Object::RustData(data))) + Value::Object(self.tlab.alloc_rust_data(data)) } /// Downcast a `Value::Object` pointing to `Object::RustData` to `&T`. @@ -1081,7 +1019,7 @@ impl BexVm { /// Allocate a type descriptor object on the heap. pub fn alloc_type(&mut self, ty: baml_type::Ty) -> Value { - Value::Object(self.tlab.alloc(Object::Type(Box::new(ty)))) + Value::Object(self.tlab.alloc_type(ty)) } /// Stops the execution of the current bytecode in favor of the given @@ -1276,6 +1214,10 @@ impl BexVm { let msg = self.alloc_string("unreachable code executed".to_string()); (PanicClass::Unreachable, vec![msg]) } + VmPanic::Cancelled => { + let msg = self.alloc_string("operation cancelled".to_string()); + (PanicClass::Cancelled, vec![msg]) + } VmPanic::UserPanic { message } => { let msg = self.alloc_string(message); (PanicClass::UserPanic, vec![msg]) @@ -1883,6 +1825,7 @@ impl BexVm { /// Wraps `exec_inner` to convert `InternalError` → `TracedInternalError` /// with a captured stack trace. pub fn exec(&mut self) -> Result { + self.early_yield.reset(); match self.exec_inner() { Err(VmError::InternalError(err)) => { let trace = self.capture_stack_trace(); @@ -2019,7 +1962,7 @@ impl BexVm { let (instruction, metadata) = crate::debug::display_instruction( instruction_ptr, function, - &self.globals, + self.globals.as_slice(), None, None, ); @@ -2181,8 +2124,8 @@ impl BexVm { } Instruction::LoadGlobal(index) => { - let value = &self.globals[index]; - self.stack.push(*value); + let value = self.globals.get(index); + self.stack.push(value); } Instruction::StoreGlobal(index) => { @@ -2190,7 +2133,10 @@ impl BexVm { let value = self.stack.ensure_pop()?; // No write barrier: globals is a VM-owned root structure, not a heap object. - self.globals[index] = value; + // Only valid during `$init`; post-init globals are frozen in `Arc<[Value]>` + // and a write here is a VM internal error. + self.globals + .set(index, value, VmInternalError::StoreGlobalAfterInit)?; } Instruction::LoadField(index) => { @@ -2246,7 +2192,7 @@ impl BexVm { ); // Write barrier: mark card if instance is in an older generation than the value. - self.write_barrier(instance_index, new_value); + self.heap.write_barrier(instance_index, new_value); // Set the new value. if let Object::Instance(instance) = self.get_object_mut(instance_index) { @@ -2295,7 +2241,7 @@ impl BexVm { ); // Write barrier: mark card if instance is in an older generation than the value. - self.write_barrier(instance_index, new_value); + self.heap.write_barrier(instance_index, new_value); // Set the new value. if let Object::Instance(instance) = self.get_object_mut(instance_index) { @@ -2949,7 +2895,7 @@ impl BexVm { ); // Write barrier: mark card if array is in an older generation than the value. - self.write_barrier(array_object_index, new_value); + self.heap.write_barrier(array_object_index, new_value); // Set the new value. match self.get_object_mut(array_object_index) { @@ -3022,7 +2968,7 @@ impl BexVm { ); // Write barrier: mark card if map is in an older generation than the value. - self.write_barrier(map_index, new_value); + self.heap.write_barrier(map_index, new_value); // Set the new value. if let Object::Map(map) = self.get_object_mut(map_index) { @@ -3146,22 +3092,14 @@ impl BexVm { // Collect function call args and cleanup consumed stack. let future_args: Vec = self.stack.drain(args_offset..).collect(); - // Create the pending future with the SysOp enum. - let pending_future = PendingFuture { + // Create the unscheduled future to hand off to the embedder. + let pending_future = UnscheduledFuture { operation: sys_op, args: future_args, }; - // Allocate the future. - let future_value = self.alloc_future(Future::Pending(pending_future)); - - // Extract the index - let Value::Object(object_index) = future_value else { - unreachable!("alloc_future returns Value::Object") - }; - - // Now leave the future on top of the stack. - self.stack.push(future_value); + // Allocate the unscheduled future. + let object_index = self.tlab.alloc(Object::UnscheduledFuture(pending_future)); // Yield control flow back to the embedder. return Ok(Some(VmExecState::ScheduleFuture(object_index))); @@ -3186,12 +3124,39 @@ impl BexVm { match awaiting { // Can't do nothing, handle control flow back to embedder. - Future::Pending(_) => { - return Ok(Some(VmExecState::Await(index))); + &Future::Pending(future_id) => { + let Frame::Bytecode(bf) = &mut self.frames[*frame_idx] else { + unreachable!("exec loop frame is always Bytecode"); + }; + bf.instruction_ptr = instruction_ptr; // we will await again when the future completes + return Ok(Some(VmExecState::Await(future_id))); } // Return the ready value Future::Ready(value) => *value, + + // An error occurred while executing the future. + Future::Error(value) => return Err(VmError::Thrown(*value)), + + // The future was cancelled before completion. + Future::Cancelled => { + return Err(VmError::Thrown( + self.panic_to_exception_value(VmPanic::Cancelled), + )); + } + + // An unrecoverable internal error occurred while executing the + // future. Yield back to the engine so it can surface the original + // error from the FutureManager's `SetOnce` (the entry is leaked by + // design for InternalError, so the engine's `future_ready` will + // always find it and propagate the underlying `EngineError`). + &Future::InternalError(future_id) => { + let Frame::Bytecode(bf) = &mut self.frames[*frame_idx] else { + unreachable!("exec loop frame is always Bytecode"); + }; + bf.instruction_ptr = instruction_ptr; + return Ok(Some(VmExecState::Await(future_id))); + } } }; @@ -3753,7 +3718,7 @@ impl BexVm { .into()); }; // Write barrier before the mutable borrow so we hold only &self.heap. - self.write_barrier(cell_ptr, value); + self.heap.write_barrier(cell_ptr, value); // SAFETY: cell_ptr is a VM-owned Cell object; single-threaded. let obj = unsafe { cell_ptr.get_mut() }; let Object::Cell(cell) = obj else { @@ -3829,7 +3794,7 @@ impl BexVm { // Write barrier: mark card if cell is in an older generation than the value. // The shared borrows of `obj`/`closure` have ended, so `write_barrier` // (which borrows `self` immutably via `self.heap`) is safe here. - self.write_barrier(cell_ptr, value); + self.heap.write_barrier(cell_ptr, value); // SAFETY: cell_ptr is a VM-owned Cell object; single-threaded. let cell_obj = unsafe { cell_ptr.get_mut() }; let Object::Cell(cell) = cell_obj else { @@ -3948,3 +3913,12 @@ impl ::bex_vm_types::RootHaver for BexVm { } } } + +impl TlabHolder for BexVm { + fn tlab(&self) -> &Tlab { + &self.tlab + } + fn tlab_mut(&mut self) -> &mut Tlab { + &mut self.tlab + } +} diff --git a/baml_language/crates/bex_vm_types/src/bytecode.rs b/baml_language/crates/bex_vm_types/src/bytecode.rs index 77db19af6d..7a9e845338 100644 --- a/baml_language/crates/bex_vm_types/src/bytecode.rs +++ b/baml_language/crates/bex_vm_types/src/bytecode.rs @@ -183,6 +183,15 @@ pub enum Instruction { /// /// Format: `STORE_GLOBAL i` where `i` is the index of the global variable /// in the `Vm::globals` array. + /// + /// # Init-only invariant + /// + /// Only `$init` (or `$init_test`) functions may emit `StoreGlobal`. The + /// compiler enforces this by emitting `StoreGlobal` exclusively from + /// `compile_init_function`. Post-`$init` globals are shared across VMs as a + /// frozen `Arc<[Value]>`; the runtime treats a `StoreGlobal` against that + /// shared view as a `VmInternalError`. Hand-written or fuzzed bytecode that + /// emits `StoreGlobal` outside of `$init` will be rejected at runtime. StoreGlobal(GlobalIndex), /// Load a field of an object. diff --git a/baml_language/crates/bex_vm_types/src/indexable.rs b/baml_language/crates/bex_vm_types/src/indexable.rs index 5f8c2018cf..c9b04a47b0 100644 --- a/baml_language/crates/bex_vm_types/src/indexable.rs +++ b/baml_language/crates/bex_vm_types/src/indexable.rs @@ -11,7 +11,7 @@ //! This module provides a vector wrapper that needs specific types to index //! into it, thus solving the problem mentioned above at compile time. -use std::marker::PhantomData; +use std::{marker::PhantomData, sync::Arc}; use crate::{Object, Value}; @@ -242,6 +242,95 @@ impl<'a, T, K> std::iter::IntoIterator for &'a mut Pool { pub type GlobalPool = Pool; pub type ObjectPool = Pool; +/// The view of globals available to a [`crate::Object`]-aware VM. +/// +/// **Invariant**: only `$init` functions emit [`crate::Instruction::StoreGlobal`]. +/// The compiler enforces this by emitting `StoreGlobal` solely from +/// `compile_init_function`. The runtime enforces it via the [`VmGlobals::Owned`] +/// vs [`VmGlobals::Shared`] split: post-`$init` VMs hold a [`VmGlobals::Shared`] +/// reference into the engine's frozen globals, and a `StoreGlobal` against a +/// `Shared` view must be turned into a `VmInternalError` by the VM. +/// +/// # Variants +/// - [`VmGlobals::Owned`]: a mutable [`GlobalPool`] used during `$init` execution. +/// - [`VmGlobals::Shared`]: a frozen `Arc<[Value]>` shared by every post-`$init` VM. +/// Cloning is a refcount bump; reads are direct slice indexing. +#[derive(Clone, Debug)] +pub enum VmGlobals { + /// Mutable globals pool, used during `$init`. + Owned(GlobalPool), + /// Frozen globals shared across all post-`$init` VMs. + Shared(Arc<[Value]>), +} + +impl VmGlobals { + /// Number of globals in the view. + pub fn len(&self) -> usize { + match self { + Self::Owned(pool) => pool.len(), + Self::Shared(slice) => slice.len(), + } + } + + /// Whether the view is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Read a global by index. `Value` is `Copy` so this returns by value. + /// + /// # Panics + /// Panics if `index` is out of bounds — same contract as slice indexing. + pub fn get(&self, index: GlobalIndex) -> Value { + match self { + Self::Owned(pool) => pool[index], + Self::Shared(slice) => slice[index.into_raw()], + } + } + + /// Mutably set a global. Only succeeds for [`VmGlobals::Owned`]; returns the + /// caller-supplied error for [`VmGlobals::Shared`] (post-`$init` writes + /// violate the invariant and the VM should treat them as internal errors). + pub fn set(&mut self, index: GlobalIndex, value: Value, on_shared: E) -> Result<(), E> { + match self { + Self::Owned(pool) => { + pool[index] = value; + Ok(()) + } + Self::Shared(_) => Err(on_shared), + } + } + + /// Freeze this view into a shared `Arc<[Value]>`. For [`VmGlobals::Owned`], + /// this consumes the underlying `Vec`; for [`VmGlobals::Shared`], it clones + /// the existing `Arc`. + pub fn freeze(self) -> Arc<[Value]> { + match self { + Self::Owned(pool) => Arc::from(pool.0), + Self::Shared(slice) => slice, + } + } + + /// View the globals as a slice. Both variants support O(1) slice access. + pub fn as_slice(&self) -> &[Value] { + match self { + Self::Owned(pool) => &pool.0, + Self::Shared(slice) => slice, + } + } +} + +impl std::ops::Index for VmGlobals { + type Output = Value; + + fn index(&self, index: GlobalIndex) -> &Self::Output { + match self { + Self::Owned(pool) => &pool[index], + Self::Shared(slice) => &slice[index.into_raw()], + } + } +} + pub type StackIndex = Index; pub type GlobalIndex = Index; diff --git a/baml_language/crates/bex_vm_types/src/lib.rs b/baml_language/crates/bex_vm_types/src/lib.rs index 1d677f891c..450d552731 100644 --- a/baml_language/crates/bex_vm_types/src/lib.rs +++ b/baml_language/crates/bex_vm_types/src/lib.rs @@ -21,14 +21,14 @@ pub use bytecode::{ VizNodeMeta, VizNodeType, }; pub use heap_ptr::HeapPtr; -pub use indexable::{GlobalIndex, GlobalPool, ObjectIndex, ObjectPool, StackIndex}; +pub use indexable::{GlobalIndex, GlobalPool, ObjectIndex, ObjectPool, StackIndex, VmGlobals}; pub use roots::RootHaver; pub use types::{ Class, ClassField, ClientBuildMeta, ClientBuildType, CollectorRef, ConstValue, Enum, EnumVariant, Function, FunctionKind, FunctionMeta, FunctionOrigin, Future, Instance, - MediaValue, Object, ObjectType, PanicClass, PendingFuture, Program, PromptAst, RetryPolicyMeta, - SysOp, SysOpErrorCategory, SysOpPanicCategory, TestArgValue, TestCase, Value, Variant, - sys_op_for_path, type_tags, + MediaValue, Object, ObjectType, PanicClass, Program, PromptAst, RetryPolicyMeta, SysOp, + SysOpErrorCategory, SysOpPanicCategory, TestArgValue, TestCase, UnscheduledFuture, Value, + Variant, sys_op_for_path, type_tags, }; /// Used to check if the VM should yield early. diff --git a/baml_language/crates/bex_vm_types/src/types.rs b/baml_language/crates/bex_vm_types/src/types.rs index 7838cd7991..1ae7350073 100644 --- a/baml_language/crates/bex_vm_types/src/types.rs +++ b/baml_language/crates/bex_vm_types/src/types.rs @@ -31,6 +31,12 @@ pub struct Program { pub objects: ObjectPool, /// Global variables (converted from `ConstValue` to Value at load time). + /// + /// # Init-only invariant + /// Globals are populated by `$init` (top-level let bindings) at engine + /// load time and **not** mutated again. The runtime freezes them into a + /// shared `Arc<[Value]>` after `$init` finishes; only `$init` may emit a + /// `StoreGlobal` against the still-mutable pool. See `Instruction::StoreGlobal`. pub globals: Vec, /// Maps function names to their object indices. @@ -710,6 +716,8 @@ pub enum Object { Map(IndexMap), Future(Future), + /// Only used for requesting scheduling of a future, passed from VM to engine. + UnscheduledFuture(UnscheduledFuture), /// Opaque Rust-managed data, accessed via `Arc` downcast. /// Used for `$rust_type` fields in builtin classes (including media classes Pdf, Audio, Video, Image). @@ -783,10 +791,16 @@ impl std::fmt::Display for Object { Object::Type(ty) => write!(f, ""), Object::Future(future) => match future { Future::Pending(future) => { - write!(f, "", future.operation) + write!(f, "", future.id) } Future::Ready(value) => write!(f, ""), + Future::Error(value) => write!(f, ""), + Future::Cancelled => write!(f, ""), + Future::InternalError(future) => { + write!(f, "", future.id) + } }, + Object::UnscheduledFuture(future) => write!(f, "", future.operation), #[cfg(feature = "heap_debug")] Object::Sentinel(kind) => write!(f, ""), // Object::BamlType(type_ir) => write!(f, ""), @@ -798,25 +812,74 @@ impl std::fmt::Display for Object { pub enum Future { /// Pending future. /// - /// Only LLM calls for now. - Pending(PendingFuture), + /// In terms of synchronization, this is "pending" from the heap's point of view. + /// It will remain pending until set otherwise, but yielding back to the engine *could* see an immediate completion. + Pending(FutureId), /// Ready value for the future. Ready(Value), + + /// A BAML error or panic occurred while executing the future. + /// If awaited, the error/panic value will be thrown. + Error(Value), + + /// The future was cancelled before completion. + /// If awaited, this will throw `baml.panics.Cancelled`. + Cancelled, + + /// An unrecoverable internal error occurred while executing the future. + /// The originating `FutureId` is preserved so the VM can yield control back + /// to the engine on `Await`, allowing the engine to surface the underlying + /// error from the `FutureManager`'s registry. Such entries are leaked from + /// `FutureManager::active_futures` by design. + InternalError(FutureId), } -/// A pending external operation. +/// An operation that should be passed to the engine to be scheduled. /// /// External operations are async functions that run outside the VM, such as /// LLM calls, HTTP requests, file I/O, or shell commands. #[derive(Clone, Debug)] -pub struct PendingFuture { +pub struct UnscheduledFuture { /// The system operation to execute. pub operation: SysOp, /// Arguments to the operation. pub args: Vec, } +/// A unique identifier for a future. +/// +/// Unlike `bex_engine::CallId`, these are created for every scheduled future (sys op or function call), +/// not just when there is a new call from the host. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct FutureId { + id: usize, +} +impl FutureId { + /// Construct a [`FutureId`] from a raw `usize`. + /// + /// # Contract + /// + /// Each `FutureId` constructed for a given engine **must** have a `usize` + /// value distinct from every other live `FutureId` in that engine. The + /// engine satisfies this by issuing values from a monotonic + /// [`AtomicUsize`](::core::sync::atomic::AtomicUsize) counter inside its + /// `FutureManager`. + /// + /// Violating this contract does **not** cause memory unsafety, but it + /// causes `FutureManager` lookup collisions (two distinct futures sharing + /// the same map key, with all the silent data corruption that implies). + /// Outside of the engine and its tests, prefer calls that route through + /// `FutureManagerGuard::new_future` instead of constructing ids by hand. + pub fn from_usize(id: usize) -> Self { + Self { id } + } + + pub fn as_usize(self) -> usize { + self.id + } +} + /// Types of values. /// /// Used for checking type errors at runtime. We can probably use some lib @@ -878,6 +941,7 @@ pub enum ObjectType { Enum, Variant, Future(FutureType), + UnscheduledFuture, Collector, Type, RustData, @@ -902,6 +966,7 @@ impl ObjectType { Object::Collector(_) => Self::Collector, Object::Type(_) => Self::Type, Object::Future(fut) => Self::Future(fut.into()), + Object::UnscheduledFuture(_) => Self::UnscheduledFuture, #[cfg(feature = "heap_debug")] Object::Sentinel(_) => Self::Any, // Object::BamlType(_) => Self::Any, // TODO @@ -935,6 +1000,7 @@ impl std::fmt::Display for ObjectType { ObjectType::Enum => write!(f, "enum"), ObjectType::Variant => write!(f, "variant"), ObjectType::Future(future_type) => write!(f, "{future_type}"), + ObjectType::UnscheduledFuture => write!(f, "unscheduled_future"), ObjectType::String => write!(f, "string"), ObjectType::Uint8Array => write!(f, "uint8array"), ObjectType::Collector => write!(f, "collector"), @@ -978,6 +1044,21 @@ pub enum FutureType { Any, Pending, Ready, + Error, + Cancelled, + InternalError, +} + +impl FutureType { + pub fn of(future: &Future) -> Self { + match future { + Future::Pending(_) => Self::Pending, + Future::Ready(_) => Self::Ready, + Future::Error(_) => Self::Error, + Future::Cancelled => Self::Cancelled, + Future::InternalError(_) => Self::InternalError, + } + } } impl std::fmt::Display for FutureType { @@ -986,6 +1067,9 @@ impl std::fmt::Display for FutureType { FutureType::Any => write!(f, "any"), FutureType::Pending => write!(f, "pending"), FutureType::Ready => write!(f, "ready"), + FutureType::Error => write!(f, "error"), + FutureType::Cancelled => write!(f, "cancelled"), + FutureType::InternalError => write!(f, "internal_error"), } } } @@ -995,6 +1079,9 @@ impl From<&Future> for FutureType { match value { Future::Pending(_) => Self::Pending, Future::Ready(_) => Self::Ready, + Future::Error(_) => Self::Error, + Future::Cancelled => Self::Cancelled, + Future::InternalError(_) => Self::InternalError, } } } diff --git a/baml_language/crates/bridge_cffi/src/ffi/functions.rs b/baml_language/crates/bridge_cffi/src/ffi/functions.rs index 77de8ac854..43042ee9e6 100644 --- a/baml_language/crates/bridge_cffi/src/ffi/functions.rs +++ b/baml_language/crates/bridge_cffi/src/ffi/functions.rs @@ -63,7 +63,7 @@ fn runtime_error_to_string(err: &bex_project::RuntimeError) -> String { EngineError::FunctionNotFound { .. } => { format!("BamlError: BamlInvalidArgumentError: {engine_err}") } - EngineError::Cancelled => { + e if bex_project::is_cancelled_engine_error(e) => { format!("BamlError: BamlCancelledError: {engine_err}") } _ => format!("BamlError: BamlClientError: {engine_err}"), diff --git a/baml_language/crates/bridge_nodejs/src/errors.rs b/baml_language/crates/bridge_nodejs/src/errors.rs index d3e090b882..29b54213ba 100644 --- a/baml_language/crates/bridge_nodejs/src/errors.rs +++ b/baml_language/crates/bridge_nodejs/src/errors.rs @@ -79,7 +79,7 @@ pub fn runtime_error_to_napi(err: bex_project::RuntimeError) -> napi::Error { Status::InvalidArg, format!("BamlError: BamlInvalidArgumentError: {err}"), ), - EngineError::Cancelled => napi::Error::new( + e if bex_project::is_cancelled_engine_error(e) => napi::Error::new( Status::Cancelled, format!("BamlError: BamlCancelledError: {err}"), ), diff --git a/baml_language/crates/bridge_python/src/errors.rs b/baml_language/crates/bridge_python/src/errors.rs index 72b980e813..d4f3fe903b 100644 --- a/baml_language/crates/bridge_python/src/errors.rs +++ b/baml_language/crates/bridge_python/src/errors.rs @@ -84,7 +84,9 @@ pub fn runtime_error_to_py(err: bex_project::RuntimeError) -> PyErr { EngineError::FunctionNotFound { .. } => { PyErr::new::(err.to_string()) } - EngineError::Cancelled => PyErr::new::(err.to_string()), + e if bex_project::is_cancelled_engine_error(e) => { + PyErr::new::(err.to_string()) + } _ => PyErr::new::(err.to_string()), } } diff --git a/baml_language/crates/bridge_wasm/src/lib.rs b/baml_language/crates/bridge_wasm/src/lib.rs index 81cae5f8a3..98acf3f9c0 100644 --- a/baml_language/crates/bridge_wasm/src/lib.rs +++ b/baml_language/crates/bridge_wasm/src/lib.rs @@ -259,10 +259,7 @@ impl BamlWasmRuntime { // Handle cancellation error. let result = result.map_err(|e| -> JsValue { - if matches!( - e, - bex_project::RuntimeError::Engine(bex_project::EngineError::Cancelled) - ) { + if bex_project::is_cancelled_runtime_error(&e) { let err = js_sys::Error::new("Operation cancelled"); err.set_name("BamlCancelledError"); err.into() @@ -393,7 +390,7 @@ impl BamlWasmRuntime { .map_err(|e| JsError::new(&format!("Failed to encode result: {e}")))?; Ok(baml_value.encode_to_vec()) } - Err(bex_project::EngineError::Cancelled) => { + Err(e) if bex_project::is_cancelled_engine_error(&e) => { let error = js_sys::Error::new("Function call was cancelled"); error.set_name("BamlCancelledError"); Err(error.into()) diff --git a/baml_language/crates/sys_ops/src/lib.rs b/baml_language/crates/sys_ops/src/lib.rs index 459e4fa5f2..1b3e7ce81a 100644 --- a/baml_language/crates/sys_ops/src/lib.rs +++ b/baml_language/crates/sys_ops/src/lib.rs @@ -1329,6 +1329,7 @@ pub type SysOpsBuilder = IoSysOpsBuilder; #[cfg(test)] mod tests { + use bex_heap::HeapPermit; use bex_vm_types::SysOp; use super::*; diff --git a/baml_language/crates/sys_types/src/lib.rs b/baml_language/crates/sys_types/src/lib.rs index 533a37aea0..b9898b8eb6 100644 --- a/baml_language/crates/sys_types/src/lib.rs +++ b/baml_language/crates/sys_types/src/lib.rs @@ -92,7 +92,7 @@ impl std::fmt::Display for CallId { /// Errors that can occur during external operation execution. /// Every error is tied to the operation (`fn_name`) that was being called. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct OpError { pub fn_name: SysOp, pub kind: OpErrorKind, @@ -137,7 +137,7 @@ pub use bex_vm_types::{SysOpErrorCategory, SysOpPanicCategory}; // ============================================================================ /// Errors that can occur during external operation execution. -#[derive(Debug, PartialEq, thiserror::Error)] +#[derive(Debug, PartialEq, thiserror::Error, Clone)] pub enum OpErrorKind { #[error("Invalid number of arguments: expected {expected}, got {actual}")] InvalidArgumentCount { expected: usize, actual: usize }, diff --git a/baml_language/crates/tools_onionskin/src/compiler.rs b/baml_language/crates/tools_onionskin/src/compiler.rs index 6df10044de..b88a71f0b4 100644 --- a/baml_language/crates/tools_onionskin/src/compiler.rs +++ b/baml_language/crates/tools_onionskin/src/compiler.rs @@ -4994,6 +4994,7 @@ fn format_vm_value(value: &bex_vm_types::Value, vm: &bex_vm::BexVm) -> String { Object::Class(c) => format!("", c.name), Object::Enum(e) => format!("", e.name), Object::Future(_) => "".to_string(), + Object::UnscheduledFuture(_) => "".to_string(), Object::Collector(_) => "".to_string(), Object::Type(ty) => format!(""), Object::Closure(c) => format!("", c.captures.len()),