Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions baml_language/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions baml_language/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ wasm-logger = "0.2.0"
web-time = "1.1.0"
getrandom = { version = "0.2" }
rsa = { version = "0.9", default-features = false, features = [ "std", "pem" ] }
hex = { version = "0.4" }
sha2 = { version = "0.10", features = [ "oid" ] }

[workspace.lints.rust]
Expand Down
66 changes: 60 additions & 6 deletions baml_language/crates/baml_lsp_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ pub mod playground_sender;
pub mod playground_server;
pub mod playground_ws;

use std::sync::Arc;
use std::{
collections::HashMap,
sync::{Arc, RwLock, atomic::AtomicU64},
};

use playground_env::{PlaygroundEnv, PlaygroundEnvState};
use playground_http::{PlaygroundHttp, PlaygroundHttpState};
use playground_ws::WsOutMessage;
use sys_ops::replay::{RecordReplay, RecordedResponse, ReplayHttp, RequestKey};
use tokio::net::TcpListener;

/// Per-project replay store map, shared between the sys_op_factory and the WS server.
pub type ReplayStoreMap =
Arc<std::sync::Mutex<HashMap<String, Arc<RwLock<RecordReplay<RequestKey, RecordedResponse>>>>>>;

pub fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
Expand All @@ -58,13 +66,48 @@ pub fn version() -> &'static str {
fn build_playground_sys_ops(
broadcast_tx: &tokio::sync::broadcast::Sender<WsOutMessage>,
env_state: &Arc<PlaygroundEnvState>,
replay_store: Arc<RwLock<RecordReplay<RequestKey, RecordedResponse>>>,
) -> sys_ops::SysOps {
let http_state = Arc::new(PlaygroundHttpState::new(broadcast_tx.clone()));
let fetch_id_alloc = Arc::new(AtomicU64::new(1));
let http_state = Arc::new(PlaygroundHttpState::new(
broadcast_tx.clone(),
fetch_id_alloc.clone(),
));

let broadcast_tx_for_replay = broadcast_tx.clone();
let on_replay: Arc<dyn Fn(sys_ops::replay::ReplayFetchEvent) + Send + Sync> =
Arc::new(move |event: sys_ops::replay::ReplayFetchEvent| {
let _ = broadcast_tx_for_replay.send(WsOutMessage::FetchLogNew {
call_id: event.call_id,
id: event.fetch_id,
method: event.method,
url: event.url,
request_headers: event.request_headers,
request_body: event.request_body,
replayed: Some(true),
});
let _ = broadcast_tx_for_replay.send(WsOutMessage::FetchLogUpdate {
call_id: event.call_id,
log_id: event.fetch_id,
status: Some(event.status),
duration_ms: Some(event.duration_ms),
response_body: Some(event.response_body),
error: None,
response_headers: Some(event.response_headers),
});
});

let replay = ReplayHttp::new(
Arc::new(PlaygroundHttp(http_state)),
replay_store,
fetch_id_alloc,
Some(on_replay),
);
sys_ops::SysOpsBuilder::new()
.with_fs::<sys_native::NativeSysOps>()
.with_sys::<sys_native::NativeSysOps>()
.with_net::<sys_native::NativeSysOps>()
.with_http_instance(Arc::new(PlaygroundHttp(http_state)))
.with_http_instance(Arc::new(replay))
.with_env_instance(Arc::new(PlaygroundEnv(env_state.clone())))
.build()
}
Expand Down Expand Up @@ -98,15 +141,24 @@ pub fn run_server(playground_via_browser: bool) -> anyhow::Result<()> {
let env_state = Arc::new(PlaygroundEnvState::new(broadcast_tx.clone()));

// Build SysOps with playground interception.
// The factory creates the same ops for every project.
// The factory creates per-project ops with replay stores.
let replay_stores: ReplayStoreMap = Arc::new(std::sync::Mutex::new(HashMap::new()));
let replay_stores_for_factory = replay_stores.clone();

let broadcast_tx_for_factory = broadcast_tx.clone();
let env_state_for_factory = env_state.clone();
#[allow(clippy::type_complexity)]
let sys_op_factory: Arc<dyn Fn(&vfs::VfsPath) -> Arc<sys_ops::SysOps> + Send + Sync> =
Arc::new(move |_path: &vfs::VfsPath| {
Arc::new(move |path: &vfs::VfsPath| {
let replay_store = Arc::new(RwLock::new(RecordReplay::new()));
replay_stores_for_factory
.lock()
.unwrap()
.insert(path.as_str().to_string(), replay_store.clone());
Arc::new(build_playground_sys_ops(
&broadcast_tx_for_factory,
&env_state_for_factory,
replay_store,
))
});

Expand Down Expand Up @@ -160,8 +212,10 @@ pub fn run_server(playground_via_browser: bool) -> anyhow::Result<()> {
let bex_for_playground = bex.clone();
let btx = broadcast_tx.clone();
let es = env_state.clone();
let rs = replay_stores.clone();
tokio_runtime.spawn(async move {
if let Err(e) = playground_server::run(listener, bex_for_playground, btx, es).await {
if let Err(e) = playground_server::run(listener, bex_for_playground, btx, es, rs).await
{
tracing::error!("Playground server exited: {e}");
}
});
Expand Down
12 changes: 8 additions & 4 deletions baml_language/crates/baml_lsp_server/src/playground_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ use crate::playground_ws::WsOutMessage;
/// Shared state for the HTTP interceptor.
pub struct PlaygroundHttpState {
broadcast_tx: broadcast::Sender<WsOutMessage>,
next_fetch_id: AtomicU64,
next_fetch_id: Arc<AtomicU64>,
/// Maps response body pointer → (call_id, fetch_id) for response_text tracking.
response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
pub response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
}
Comment on lines 22 to 27
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Keep response_to_fetch out of the public API.

Exposing a mutable Mutex<HashMap<...>> field makes this fetch/body correlation invariant externally mutable. If the replay wiring only needs it inside baml_lsp_server, pub(crate) or a small accessor keeps the API surface tighter.

🔒 Suggested change
 pub struct PlaygroundHttpState {
     broadcast_tx: broadcast::Sender<WsOutMessage>,
     next_fetch_id: Arc<AtomicU64>,
     /// Maps response body pointer → (call_id, fetch_id) for response_text tracking.
-    pub response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
+    pub(crate) response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub struct PlaygroundHttpState {
broadcast_tx: broadcast::Sender<WsOutMessage>,
next_fetch_id: AtomicU64,
next_fetch_id: Arc<AtomicU64>,
/// Maps response body pointer → (call_id, fetch_id) for response_text tracking.
response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
pub response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
}
pub struct PlaygroundHttpState {
broadcast_tx: broadcast::Sender<WsOutMessage>,
next_fetch_id: Arc<AtomicU64>,
/// Maps response body pointer → (call_id, fetch_id) for response_text tracking.
pub(crate) response_to_fetch: std::sync::Mutex<HashMap<usize, (u64, u64)>>,
}


impl PlaygroundHttpState {
pub fn new(broadcast_tx: broadcast::Sender<WsOutMessage>) -> Self {
pub fn new(
broadcast_tx: broadcast::Sender<WsOutMessage>,
fetch_id_allocator: Arc<AtomicU64>,
) -> Self {
Self {
broadcast_tx,
next_fetch_id: AtomicU64::new(1),
next_fetch_id: fetch_id_allocator,
response_to_fetch: std::sync::Mutex::new(HashMap::new()),
}
}
Expand Down Expand Up @@ -113,6 +116,7 @@ impl io::IoNamespaceHttp for PlaygroundHttp {
url: request.url.clone(),
request_headers: extract_headers_as_hashmap(&request.headers),
request_body: request.body.clone(),
replayed: None,
});

let native_result = <sys_native::NativeSysOps as io::IoNamespaceHttp>::send(
Expand Down
49 changes: 48 additions & 1 deletion baml_language/crates/baml_lsp_server/src/playground_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ pub async fn pick_port(base_port: u16, max_attempts: u16) -> anyhow::Result<(Tcp
// Shared state for Axum handlers
// ---------------------------------------------------------------------------

use crate::ReplayStoreMap;

#[derive(Clone)]
struct WsState {
bex: Arc<dyn bex_project::BexLsp>,
broadcast_tx: broadcast::Sender<WsOutMessage>,
env_state: Arc<PlaygroundEnvState>,
replay_stores: ReplayStoreMap,
}

/// Start the playground server on the given listener.
Expand All @@ -76,8 +79,9 @@ pub async fn run(
bex: Arc<dyn bex_project::BexLsp>,
broadcast_tx: broadcast::Sender<WsOutMessage>,
env_state: Arc<PlaygroundEnvState>,
replay_stores: ReplayStoreMap,
) -> anyhow::Result<()> {
let app = build_router(bex, broadcast_tx, env_state)?;
let app = build_router(bex, broadcast_tx, env_state, replay_stores)?;

tracing::info!(
"Playground: http://localhost:{}",
Expand All @@ -93,11 +97,13 @@ fn build_router(
bex: Arc<dyn bex_project::BexLsp>,
broadcast_tx: broadcast::Sender<WsOutMessage>,
env_state: Arc<PlaygroundEnvState>,
replay_stores: ReplayStoreMap,
) -> anyhow::Result<Router> {
let ws_state = WsState {
bex,
broadcast_tx,
env_state,
replay_stores,
};

let api = Router::new()
Expand Down Expand Up @@ -419,6 +425,47 @@ async fn handle_ws_in_message(
tracing::warn!("Failed to send cursor context");
}
}

WsInMessage::ToggleReplay {
project,
fetch_id,
pinned,
} => {
let stores = state.replay_stores.lock().unwrap();
if let Some(store) = stores.get(&project) {
store.write().unwrap().set_pinned(fetch_id, pinned);
} else {
tracing::warn!("ToggleReplay: no replay store for project {project}");
}
Comment on lines +429 to +439
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle unknown fetch_ids explicitly.

set_pinned() already tells you when the requested recording is missing. Ignoring that return value turns stale replay-manager state into a silent no-op.

💡 Suggested change
         WsInMessage::ToggleReplay {
             project,
             fetch_id,
             pinned,
         } => {
             let stores = state.replay_stores.lock().unwrap();
             if let Some(store) = stores.get(&project) {
-                store.write().unwrap().set_pinned(fetch_id, pinned);
+                if !store.write().unwrap().set_pinned(fetch_id, pinned) {
+                    tracing::warn!("ToggleReplay: unknown fetch_id {fetch_id} for project {project}");
+                }
             } else {
                 tracing::warn!("ToggleReplay: no replay store for project {project}");
             }
         }

}

WsInMessage::RequestReplayState { project } => {
// Collect entries while holding the lock, then drop it before await.
let entries: Vec<serde_json::Value> = {
let stores = state.replay_stores.lock().unwrap();
if let Some(store) = stores.get(&project) {
let store = store.read().unwrap();
store
.snapshot()
.into_iter()
.map(|g| serde_json::to_value(g).unwrap())
.collect()
} else {
vec![]
}
};
// Send via sink (per-session), not broadcast.
if let Err(e) = sink
.send(axum::extract::ws::Message::Text(
serde_json::to_string(&WsOutMessage::ReplayState { entries })
.unwrap()
.into(),
))
.await
{
tracing::warn!("Failed to send ReplayState: {e}");
}
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions baml_language/crates/baml_lsp_server/src/playground_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ pub enum WsInMessage {
line: u32,
column: u32,
},
#[serde(rename = "toggleReplay")]
ToggleReplay {
project: String,
#[serde(rename = "fetchId")]
fetch_id: u64,
pinned: bool,
},
#[serde(rename = "requestReplayState")]
RequestReplayState { project: String },
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -99,6 +108,8 @@ pub enum WsOutMessage {
request_headers: std::collections::HashMap<String, String>,
#[serde(rename = "requestBody")]
request_body: String,
#[serde(skip_serializing_if = "Option::is_none")]
replayed: Option<bool>,
},
#[serde(rename = "fetchLogUpdate")]
FetchLogUpdate {
Expand All @@ -125,4 +136,6 @@ pub enum WsOutMessage {
},
#[serde(rename = "cursorContext")]
CursorContext { context: serde_json::Value },
#[serde(rename = "replayState")]
ReplayState { entries: Vec<serde_json::Value> },
}
Loading
Loading