Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Local cross-compile config for dec-bench harness tests.
# CI uses manylinux_2_28 directly; this is the convenience path for
# `cross build --target aarch64-unknown-linux-gnu -p moose-cli`.
[build.env]
# Tell openssl-sys to skip the vendored build (it strips deprecated symbols
# like SSL_get_peer_certificate that librdkafka still calls) and link the
# system OpenSSL 1.1 inside the container instead. OPENSSL_NO_VENDOR is the
# documented opt-out for the `vendored` feature — it keeps moose-cli's
# Cargo.toml unchanged so regular native builds keep vendoring.
passthrough = [
"OPENSSL_NO_VENDOR",
"OPENSSL_LIB_DIR",
"OPENSSL_INCLUDE_DIR",
"OPENSSL_STATIC",
]

[target.aarch64-unknown-linux-gnu]
# The pinned 0.2.5 image ships libssl 1.0.0, which lacks SSL_get_peer_certificate
# as an exported symbol and fails librdkafka's final link. `:main` is Ubuntu
# 20.04 with libssl 1.1.1, which has the symbol.
image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu:main"
Comment thread
callicles marked this conversation as resolved.
# cross's /opt/toolchain.cmake pins CMAKE_FIND_ROOT_PATH to /usr/aarch64-linux-gnu
# (sysroot layout), but apt multiarch installs arm64 libs to
# /usr/lib/aarch64-linux-gnu (Debian layout). rdkafka-sys's cmake build of
# librdkafka does FIND_PACKAGE(ZLIB) / FIND_PACKAGE(OpenSSL) and can't see
# across that gap — so we symlink the multiarch lib/headers into the sysroot.
pre-build = [
"dpkg --add-architecture arm64",
"apt-get update",
"apt-get install -y --no-install-recommends protobuf-compiler libprotobuf-dev cmake pkg-config zlib1g-dev:arm64 libssl-dev:arm64",
"mkdir -p /usr/aarch64-linux-gnu/lib /usr/aarch64-linux-gnu/include",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.so /usr/aarch64-linux-gnu/lib/libz.so",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.a /usr/aarch64-linux-gnu/lib/libz.a",
"ln -sf /usr/include/zlib.h /usr/aarch64-linux-gnu/include/zlib.h",
"ln -sf /usr/include/zconf.h /usr/aarch64-linux-gnu/include/zconf.h",
"for f in libssl.so libssl.a libcrypto.so libcrypto.a; do ln -sf /usr/lib/aarch64-linux-gnu/$f /usr/aarch64-linux-gnu/lib/$f; done",
"ln -sfn /usr/include/openssl /usr/aarch64-linux-gnu/include/openssl",
Comment thread
callicles marked this conversation as resolved.
]
8 changes: 6 additions & 2 deletions apps/framework-cli-e2e/test/unloaded-files-warning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ describe("Unloaded Files Warning", () => {
let devProcess: ChildProcess | null = null;

afterEach(async () => {
await stopDevProcess(devProcess, { logger: testLogger });
await stopDevProcess(devProcess, { logger: testLogger, ports: PORTS });
devProcess = null;
await killRemainingProcesses({ logger: testLogger });
// Pass the test's actual offset ports so killRemainingProcesses both
// targets them with SIGKILL and waits until they are released. Without
// this, ClickHouse's async shutdown still holds files in testDir when
// `rmSync` runs, producing ENOTEMPTY.
await killRemainingProcesses({ logger: testLogger, ports: PORTS });

if (testDir && fs.existsSync(testDir)) {
testLogger.debug("Cleaning up test directory", { testDir });
Expand Down
121 changes: 105 additions & 16 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,26 +571,111 @@ async fn get_consumption_api_res(
let _ = execute!(std::io::stdout(), Print(msg + "\n"));
}

let mut client_req = reqwest::Request::new(req.method().clone(), url.parse()?);
// Capture method + headers up front so we can rebuild the reqwest::Request
Comment thread
callicles marked this conversation as resolved.
// across retry attempts (reqwest::Request isn't Clone when building via
// reqwest::Request::new). Only GET requests are proxied here and they
// carry no body, so rebuild-per-attempt is cheap.
let method = req.method().clone();
let hdrs: Vec<(hyper::http::HeaderName, hyper::http::HeaderValue)> = req
.headers()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let url_parsed: reqwest::Url = url.parse()?;

// Retry only on genuine connect errors — these are the hot-reload window
// (consumption-api primary restarting → :proxy_port momentarily closed).
// Other failures (timeouts, TLS errors, 5xx from upstream) surface once.
const MAX_ATTEMPTS: usize = 3;
const BACKOFF_MS: [u64; 2] = [150, 400]; // applied between attempts 1→2 and 2→3

let mut last_connect_err: Option<reqwest::Error> = None;
for attempt in 0..MAX_ATTEMPTS {
let mut client_req = reqwest::Request::new(method.clone(), url_parsed.clone());
let req_headers = client_req.headers_mut();
for (k, v) in hdrs.iter() {
req_headers.insert(k, v.clone());
}

// Copy headers
let headers = client_req.headers_mut();
for (key, value) in req.headers() {
headers.insert(key, value.clone());
match http_client.execute(client_req).await {
Ok(res) => {
let status = res.status();
let body = res.bytes().await?;
return Ok(add_cors_headers(Response::builder())
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Err(e) if is_connect_error(&e) => {
debug!(
"consumption proxy connect error on attempt {}/{}: {}",
attempt + 1,
MAX_ATTEMPTS,
e
);
last_connect_err = Some(e);
if let Some(&ms) = BACKOFF_MS.get(attempt) {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
continue;
}
// No more attempts left.
break;
}
Err(e) => {
// Non-connect error: bail immediately, preserving existing
// behavior (caller turns this into 500 "Error").
return Err(e.into());
}
}
}

// Send request
let res = http_client.execute(client_req).await?;
let status = res.status();
let body = res.bytes().await?;

let returned_response = add_cors_headers(Response::builder())
.status(status)
// All retries exhausted with connect errors — return a structured 503 so
// agents can reason about this as a transient hot-reload window instead
// of an ambiguous 500. Retry-After tells well-behaved clients to try
// again shortly.
let body = serde_json::json!({
"error": "consumption_api_unavailable",
"retryable": true,
"message": format!(
"Consumption API is temporarily unavailable (restarting on hot reload). \
Comment thread
callicles marked this conversation as resolved.
Outdated
Retry shortly. Attempts: {}/{}.",
MAX_ATTEMPTS, MAX_ATTEMPTS
),
"upstream_error": last_connect_err.as_ref().map(|e| e.to_string()),
});
Ok(add_cors_headers(Response::builder())
.status(StatusCode::SERVICE_UNAVAILABLE)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap();
.header("Retry-After", "1")
.body(Full::new(Bytes::from(body.to_string())))?)
}

/// Classify a reqwest error as a pure connect-failure (server not accepting
/// connections) vs anything else. Connect errors are the hot-reload race
/// signature and the only case we want to retry — timeouts / TLS / upstream
/// 5xx are surfaced directly.
fn is_connect_error(e: &reqwest::Error) -> bool {
use std::error::Error as _;

Ok(returned_response)
if e.is_connect() {
return true;
}
// reqwest wraps hyper wraps std::io::Error; walk the source chain.
let mut src: Option<&(dyn std::error::Error + 'static)> = e.source();
while let Some(err) = src {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
use std::io::ErrorKind::*;
if matches!(
io_err.kind(),
ConnectionRefused | ConnectionReset | ConnectionAborted | NotConnected
) {
return true;
}
}
src = err.source();
}
false
}

#[derive(Clone)]
Expand Down Expand Up @@ -2698,7 +2783,11 @@ impl Webserver {
.await
.unwrap_or_else(|e| handle_listener_err(management_socket.port(), e));

// Check if proxy port is available
// Defense in depth: quick bind-and-drop on proxy_port so Docker-mode
// users get a clear error before the Node worker spawns. The
// --dockerless path also runs a structured preflight in
// NativeInfraProvider::start that covers this port alongside the
// embedded-infra ports.
let proxy_socket = self.get_socket(project.http_server_config.proxy_port).await;
TcpListener::bind(proxy_socket)
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::net::TcpListener;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};

use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::cli::logger::{context, resource_type};
use crate::utilities::system::{RestartPolicy, RestartingProcess, StartChildFn};
Expand Down Expand Up @@ -78,6 +81,20 @@ impl ConsumptionProcessRegistry {
let jwt_config = self.jwt_config.clone();
let proxy_port = self.proxy_port;

// Hot-reload race mitigation: when ConsumptionApiWebServer::Updated
// fires, we call stop() then start() back-to-back. The prior Node
// primary's workers drain connections asynchronously, so :proxy_port
// may still be held by the kernel when the new primary tries to
// fork workers — every new worker's listen() then fails EADDRINUSE
// and the cluster enters a retry storm.
//
// Here we wait until we can successfully bind the port ourselves
// (releasing it immediately after). That proves the old listener is
// gone, so the next process's listen() will succeed cleanly.
if let Some(port) = proxy_port {
wait_for_port_free("127.0.0.1", port);
}

let start_child: StartChildFn<ConsumptionError> = match self.language {
SupportedLanguages::Python => Box::new(move || {
python::consumption::run(
Expand Down Expand Up @@ -132,3 +149,33 @@ impl ConsumptionProcessRegistry {
Ok(())
}
}

/// Block (briefly) until `host:port` is bindable, i.e. any prior listener
/// has fully released it. Polls every 100ms up to 5s. Non-fatal: on timeout
/// we log a warning and return so the caller can still attempt the spawn —
/// the Node-side `server.on('error')` handler will catch a lingering
/// EADDRINUSE and the cluster's restart path will recover.
fn wait_for_port_free(host: &str, port: u16) {
const INTERVAL: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_secs(5);

let deadline = Instant::now() + TIMEOUT;
loop {
// A successful bind here proves the port is free. The TcpListener is
// dropped at end of this scope, releasing the port for the real
// owner to grab a few ms later. Using SO_REUSEADDR (Rust default on
// Linux) means the subsequent bind by the Node worker won't be
// blocked by our brief hold.
Comment thread
callicles marked this conversation as resolved.
Outdated
match TcpListener::bind((host, port)) {
Comment thread
callicles marked this conversation as resolved.
Ok(_) => return,
Err(_) if Instant::now() < deadline => thread::sleep(INTERVAL),
Err(e) => {
warn!(
"consumption-api: port {port} still held after {TIMEOUT:?} ({e}); \
continuing — Node-side error handler will manage the retry"
);
return;
}
}
}
}
14 changes: 14 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ use tracing::info;
/// Data directory layout under `{project}/.moose/native_infra/clickhouse/`.
const NATIVE_CH_DIR: &str = "native_infra/clickhouse";

/// `ps -o comm=` value for the ClickHouse watchdog process.
///
/// On Linux the watchdog calls `prctl(PR_SET_NAME, "clckhouse-watch")` at
/// startup — the 'i' is dropped so the name fits the 15-char
/// `TASK_COMM_LEN`. On macOS no such rename happens and `ps` reports
/// `clickhouse` for the spawned binary. Persisting the platform-appropriate
/// string in the PID file lets `process_matches` confirm identity before
/// sending SIGTERM on shutdown on both targets.
#[cfg(target_os = "linux")]
pub const WATCHDOG_COMM: &str = "clckhouse-watch";

#[cfg(not(target_os = "linux"))]
pub const WATCHDOG_COMM: &str = "clickhouse";

/// Ensure the ClickHouse binary is cached and return its path.
pub fn ensure_binary(manager: &BinaryManager) -> Result<PathBuf, NativeInfraError> {
let (url, archive_path, expected_sha256) = clickhouse_download_metadata();
Expand Down
3 changes: 3 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,7 @@ pub enum NativeInfraError {

#[error("health check failed for {service}: {reason}")]
HealthCheck { service: String, reason: String },

#[error("{0}")]
PortConflict(#[from] super::preflight::PortConflictError),
}
63 changes: 58 additions & 5 deletions apps/framework-cli/src/utilities/native_infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod clickhouse;
pub mod devkafka;
pub mod devredis;
pub mod errors;
pub mod preflight;
pub mod temporal;

use crate::cli::display::{with_spinner_completion, with_timing, Message};
Expand All @@ -17,9 +18,9 @@ use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, OnceLock};
use std::thread::sleep;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
use tracing::info;
use tracing::{info, warn};

/// Relative path from project root to the native infrastructure directory.
pub const NATIVE_INFRA_DIR: &str = ".moose/native_infra";
Expand Down Expand Up @@ -126,6 +127,19 @@ impl InfraProvider for NativeInfraProvider {
}

fn start(&self, project: &Project) -> Result<(), RoutineFailure> {
// Preflight: surface EADDRINUSE in a single actionable message before
// anything starts. Prevents the Node consumption worker from entering
// an unbounded restart loop when a prior `moose dev --dockerless` is
// still holding ports 4001 / 6379 / 19092.
let specs = preflight::port_specs_for(
project,
self.scripts_enabled,
/* include_webserver = */ true,
Comment thread
callicles marked this conversation as resolved.
Outdated
);
preflight::check_ports(&specs, &preflight::native_dir_for(project))
.map_err(NativeInfraError::from)
.map_err(Self::map_native_err)?;

// Start embedded devredis (Redis needed early for leadership/presence)
let devredis_handle = with_timing("Start devredis", || {
with_spinner_completion(
Expand Down Expand Up @@ -202,9 +216,17 @@ impl InfraProvider for NativeInfraProvider {
let _ = child.start_kill();
anyhow::anyhow!("ClickHouse process exited immediately after spawn")
})?;
if let Err(e) =
write_pid_file(&clickhouse::pid_file_path(project), pid, "clickhouse")
{
// ClickHouse's watchdog sets its own comm via prctl(PR_SET_NAME)
// to `clckhouse-watch` (missing the 'i', trimmed to fit the
// 15-char TASK_COMM_LEN). The PID we capture here is the
// watchdog — not the server child — so store the name that
// `ps -o comm=` will actually report so `process_matches`
// can later verify identity and issue SIGTERM on shutdown.
if let Err(e) = write_pid_file(
&clickhouse::pid_file_path(project),
pid,
clickhouse::WATCHDOG_COMM,
) {
let _ = child.start_kill();
return Err(anyhow::anyhow!("{}", e));
}
Expand Down Expand Up @@ -489,12 +511,43 @@ pub fn kill_pid_file(pid_path: &Path) {
}
Ok(_) => {
info!("PID {pid} already exited or could not be signaled");
let _ = std::fs::remove_file(pid_path);
return;
}
Err(e) => {
info!("Failed to run kill command for PID {pid}: {e}");
let _ = std::fs::remove_file(pid_path);
return;
}
}

// ClickHouse takes several seconds to flush and release its listen sockets;
// without waiting here the next `moose dev --dockerless` preflight sees the
// ports still bound and aborts with a false "another moose dev is running"
// error. Poll `kill -0` until the PID is gone, then escalate to SIGKILL.
const WAIT_TIMEOUT: Duration = Duration::from_secs(10);
const POLL_INTERVAL: Duration = Duration::from_millis(100);
let deadline = Instant::now() + WAIT_TIMEOUT;
while Instant::now() < deadline {
Comment thread
callicles marked this conversation as resolved.
let still_alive = std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output()
.map(|o| o.status.success())
.unwrap_or(false);
if !still_alive {
info!("PID {pid} exited after SIGTERM");
let _ = std::fs::remove_file(pid_path);
return;
}
sleep(POLL_INTERVAL);
}

warn!("PID {pid} did not exit within {WAIT_TIMEOUT:?}; sending SIGKILL");
let _ = std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.output();
// Brief grace for the kernel to release ports held by the killed process.
sleep(Duration::from_millis(200));
Comment thread
callicles marked this conversation as resolved.
let _ = std::fs::remove_file(pid_path);
}

Expand Down
Loading
Loading