Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Local cross-compile config for dec-bench harness tests.
# CI uses manylinux_2_28 directly; this is the convenience path for
# `cross build --target aarch64-unknown-linux-gnu -p moose-cli`.
[build.env]
# Tell openssl-sys to skip the vendored build (it strips deprecated symbols
# like SSL_get_peer_certificate that librdkafka still calls) and link the
# system OpenSSL 1.1 inside the container instead. OPENSSL_NO_VENDOR is the
# documented opt-out for the `vendored` feature — it keeps moose-cli's
# Cargo.toml unchanged so regular native builds keep vendoring.
#
# CC / CXX / AR are passthrough because rdkafka-sys runs librdkafka's
# autoconf `./configure` as a subprocess, which reads bare `$CC` from env
# rather than cargo's target-qualified `CC_aarch64_unknown_linux_gnu`.
# Without them configure defaults to the host `gcc` (x86_64) and ld fails
# with `cannot find -lcrypto` because only arm64 libs are in the sysroot.
# Invoke like:
# CC=aarch64-linux-gnu-gcc CXX=aarch64-linux-gnu-g++ AR=aarch64-linux-gnu-ar \
# OPENSSL_NO_VENDOR=1 cross build --target aarch64-unknown-linux-gnu -p moose-cli
passthrough = [
"OPENSSL_NO_VENDOR",
"OPENSSL_LIB_DIR",
"OPENSSL_INCLUDE_DIR",
"OPENSSL_STATIC",
"CC",
"CXX",
"AR",
]

[target.aarch64-unknown-linux-gnu]
# The pinned 0.2.5 image ships libssl 1.0.0, which lacks SSL_get_peer_certificate
# as an exported symbol and fails librdkafka's final link. `:main` is Ubuntu
# 20.04 with libssl 1.1.1, which has the symbol.
image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu:main"
Comment thread
callicles marked this conversation as resolved.
# cross's /opt/toolchain.cmake pins CMAKE_FIND_ROOT_PATH to /usr/aarch64-linux-gnu
# (sysroot layout), but apt multiarch installs arm64 libs to
# /usr/lib/aarch64-linux-gnu (Debian layout). rdkafka-sys's cmake build of
# librdkafka does FIND_PACKAGE(ZLIB) / FIND_PACKAGE(OpenSSL) and can't see
# across that gap — so we symlink the multiarch lib/headers into the sysroot.
pre-build = [
"dpkg --add-architecture arm64",
"apt-get update",
"apt-get install -y --no-install-recommends protobuf-compiler libprotobuf-dev cmake pkg-config zlib1g-dev:arm64 libssl-dev:arm64",
"mkdir -p /usr/aarch64-linux-gnu/lib /usr/aarch64-linux-gnu/include",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.so /usr/aarch64-linux-gnu/lib/libz.so",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.a /usr/aarch64-linux-gnu/lib/libz.a",
"ln -sf /usr/include/zlib.h /usr/aarch64-linux-gnu/include/zlib.h",
"ln -sf /usr/include/zconf.h /usr/aarch64-linux-gnu/include/zconf.h",
"for f in libssl.so libssl.a libcrypto.so libcrypto.a; do ln -sf /usr/lib/aarch64-linux-gnu/$f /usr/aarch64-linux-gnu/lib/$f; done",
"ln -sfn /usr/include/openssl /usr/aarch64-linux-gnu/include/openssl",
Comment thread
callicles marked this conversation as resolved.
]
8 changes: 8 additions & 0 deletions apps/framework-cli-e2e/test/s3-engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ if (!SELECTED_LANGUAGE || SELECTED_LANGUAGE === "ts") {
APP_NAMES.TYPESCRIPT_TESTS,
{
logPrefix: "TypeScript S3 Engine Test (With Env Vars)",
// Pass the offset-50 ports so `killRemainingProcesses` can
// SIGKILL leftover ClickHouse/Node consumption workers by port
// instead of targeting the default (offset-0) ports. The
// dockerless preflight in the next describe block depends on
// those ports being free.
ports: PORTS,
...getCleanupOptionsForMode(E2E_DEV_MODE),
},
);
Expand Down Expand Up @@ -194,6 +200,8 @@ if (!SELECTED_LANGUAGE || SELECTED_LANGUAGE === "py") {
APP_NAMES.PYTHON_TESTS,
{
logPrefix: "Python S3 Engine Test (With Env Vars)",
// See note on the TypeScript describe block above.
ports: PORTS,
...getCleanupOptionsForMode(E2E_DEV_MODE),
},
);
Expand Down
8 changes: 8 additions & 0 deletions apps/framework-cli-e2e/test/s3-secrets.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ if (!SELECTED_LANGUAGE || SELECTED_LANGUAGE === "ts") {
APP_NAMES.TYPESCRIPT_TESTS,
{
logPrefix: "TypeScript S3Queue Test (With Env Vars)",
// Pass the test's actual port set so `killRemainingProcesses` can
// SIGKILL leftover ClickHouse/Node consumption workers on the
// offset-40 ports. Without this it would target the default
// (offset-0) ports and leave this suite's children alive,
// causing the next describe block's preflight to fail.
ports: PORTS,
...getCleanupOptionsForMode(E2E_DEV_MODE),
},
);
Expand Down Expand Up @@ -315,6 +321,8 @@ if (!SELECTED_LANGUAGE || SELECTED_LANGUAGE === "py") {
APP_NAMES.PYTHON_TESTS,
{
logPrefix: "Python S3Queue Test (With Env Vars)",
// See note on the TypeScript describe block above.
ports: PORTS,
...getCleanupOptionsForMode(E2E_DEV_MODE),
},
);
Expand Down
8 changes: 6 additions & 2 deletions apps/framework-cli-e2e/test/unloaded-files-warning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ describe("Unloaded Files Warning", () => {
let devProcess: ChildProcess | null = null;

afterEach(async () => {
await stopDevProcess(devProcess, { logger: testLogger });
await stopDevProcess(devProcess, { logger: testLogger, ports: PORTS });
devProcess = null;
await killRemainingProcesses({ logger: testLogger });
// Pass the test's actual offset ports so killRemainingProcesses both
// targets them with SIGKILL and waits until they are released. Without
// this, ClickHouse's async shutdown still holds files in testDir when
// `rmSync` runs, producing ENOTEMPTY.
await killRemainingProcesses({ logger: testLogger, ports: PORTS });

if (testDir && fs.existsSync(testDir)) {
testLogger.debug("Cleaning up test directory", { testDir });
Expand Down
137 changes: 121 additions & 16 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,26 +571,113 @@ async fn get_consumption_api_res(
let _ = execute!(std::io::stdout(), Print(msg + "\n"));
}

let mut client_req = reqwest::Request::new(req.method().clone(), url.parse()?);
// Capture method + headers up front so we can rebuild the reqwest::Request
Comment thread
callicles marked this conversation as resolved.
// across retry attempts (reqwest::Request isn't Clone when building via
// reqwest::Request::new). Only GET requests are proxied here and they
// carry no body, so rebuild-per-attempt is cheap.
let method = req.method().clone();
let hdrs: Vec<(hyper::http::HeaderName, hyper::http::HeaderValue)> = req
.headers()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let url_parsed: reqwest::Url = url.parse()?;

// Retry only on genuine connect errors — a transient "socket not
// accepting" window where the consumption-api is momentarily
// unavailable. In dev this is most often the hot-reload restart of the
// primary worker; in prod it can happen on any brief restart / socket
// reset. Other failures (timeouts, TLS, 5xx from upstream) surface once.
const MAX_ATTEMPTS: usize = 3;
const BACKOFF_MS: [u64; 2] = [150, 400]; // applied between attempts 1→2 and 2→3

let mut last_connect_err: Option<reqwest::Error> = None;
for attempt in 0..MAX_ATTEMPTS {
let mut client_req = reqwest::Request::new(method.clone(), url_parsed.clone());
let req_headers = client_req.headers_mut();
for (k, v) in hdrs.iter() {
req_headers.insert(k, v.clone());
}

// Copy headers
let headers = client_req.headers_mut();
for (key, value) in req.headers() {
headers.insert(key, value.clone());
match http_client.execute(client_req).await {
Ok(res) => {
let status = res.status();
let body = res.bytes().await?;
return Ok(add_cors_headers(Response::builder())
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Err(e) if is_connect_error(&e) => {
debug!(
"consumption proxy connect error on attempt {}/{}: {}",
attempt + 1,
MAX_ATTEMPTS,
e
);
last_connect_err = Some(e);
if let Some(&ms) = BACKOFF_MS.get(attempt) {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
continue;
}
// No more attempts left.
break;
}
Err(e) => {
// Non-connect error: bail immediately, preserving existing
// behavior (caller turns this into 500 "Error").
return Err(e.into());
}
}
}

// Send request
let res = http_client.execute(client_req).await?;
let status = res.status();
let body = res.bytes().await?;

let returned_response = add_cors_headers(Response::builder())
.status(status)
// All retries exhausted with connect errors — return a structured 503 so
// agents can reason about this as a transient hot-reload window instead
// of an ambiguous 500. Retry-After tells well-behaved clients to try
// again shortly.
let body = serde_json::json!({
"error": "consumption_api_unavailable",
"retryable": true,
"message": format!(
"Consumption API is temporarily unavailable. \
Retry shortly. Attempts: {}/{}.",
MAX_ATTEMPTS, MAX_ATTEMPTS
),
"upstream_error": last_connect_err.as_ref().map(|e| e.to_string()),
});
Ok(add_cors_headers(Response::builder())
.status(StatusCode::SERVICE_UNAVAILABLE)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap();
.header("Retry-After", "1")
.body(Full::new(Bytes::from(body.to_string())))?)
}

Ok(returned_response)
/// Classify a reqwest error as a pure connect-failure (server not accepting
/// connections) vs anything else. Connect errors are the hot-reload race
/// signature and the only case we want to retry — timeouts / TLS / upstream
/// 5xx are surfaced directly.
fn is_connect_error(e: &reqwest::Error) -> bool {
use std::error::Error as _;

if e.is_connect() {
return true;
}
// reqwest wraps hyper wraps std::io::Error; walk the source chain.
let mut src: Option<&(dyn std::error::Error + 'static)> = e.source();
while let Some(err) = src {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
use std::io::ErrorKind::*;
if matches!(
io_err.kind(),
ConnectionRefused | ConnectionReset | ConnectionAborted | NotConnected
) {
return true;
}
}
src = err.source();
}
false
}

#[derive(Clone)]
Expand Down Expand Up @@ -2698,7 +2785,11 @@ impl Webserver {
.await
.unwrap_or_else(|e| handle_listener_err(management_socket.port(), e));

// Check if proxy port is available
// Defense in depth: quick bind-and-drop on proxy_port so Docker-mode
// users get a clear error before the Node worker spawns. The
// --dockerless path also runs a structured preflight in
// NativeInfraProvider::start that covers this port alongside the
// embedded-infra ports.
let proxy_socket = self.get_socket(project.http_server_config.proxy_port).await;
TcpListener::bind(proxy_socket)
.await
Expand Down Expand Up @@ -3115,7 +3206,21 @@ async fn shutdown(

// Step 5: Shut down native infrastructure (embedded servers + child processes).
if project.dev.dockerless {
super::display::show_message_wrapper(
MessageType::Highlight,
Message {
action: "Stopping".to_string(),
details: "native infrastructure (up to ~10s per service)...".to_string(),
},
);
crate::utilities::native_infra::stop_native_infra(project);
super::display::show_message_wrapper(
MessageType::Success,
Message {
action: "Stopped".to_string(),
details: "native infrastructure".to_string(),
},
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::net::TcpListener;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};

use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::cli::logger::{context, resource_type};
use crate::utilities::system::{RestartPolicy, RestartingProcess, StartChildFn};
Expand Down Expand Up @@ -78,6 +81,20 @@ impl ConsumptionProcessRegistry {
let jwt_config = self.jwt_config.clone();
let proxy_port = self.proxy_port;

// Hot-reload race mitigation: when ConsumptionApiWebServer::Updated
// fires, we call stop() then start() back-to-back. The prior Node
// primary's workers drain connections asynchronously, so :proxy_port
// may still be held by the kernel when the new primary tries to
// fork workers — every new worker's listen() then fails EADDRINUSE
// and the cluster enters a retry storm.
//
// Here we wait until we can successfully bind the port ourselves
// (releasing it immediately after). That proves the old listener is
// gone, so the next process's listen() will succeed cleanly.
if let Some(port) = proxy_port {
wait_for_port_free("127.0.0.1", port);
}

let start_child: StartChildFn<ConsumptionError> = match self.language {
SupportedLanguages::Python => Box::new(move || {
python::consumption::run(
Expand Down Expand Up @@ -132,3 +149,32 @@ impl ConsumptionProcessRegistry {
Ok(())
}
}

/// Block (briefly) until `host:port` is bindable, i.e. any prior listener
/// has fully released it. Polls every 100ms up to 5s. Non-fatal: on timeout
/// we log a warning and return so the caller can still attempt the spawn —
/// the Node-side `server.on('error')` handler will catch a lingering
/// EADDRINUSE and the cluster's restart path will recover.
fn wait_for_port_free(host: &str, port: u16) {
const INTERVAL: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_secs(5);

let deadline = Instant::now() + TIMEOUT;
loop {
// A successful bind here proves the port is free. The TcpListener
// is dropped at the end of this scope, releasing the fd. Since we
// never accept a connection on it, no TIME_WAIT is created, so the
// Node worker can bind the same port a few ms later.
match TcpListener::bind((host, port)) {
Comment thread
callicles marked this conversation as resolved.
Ok(_) => return,
Err(_) if Instant::now() < deadline => thread::sleep(INTERVAL),
Err(e) => {
warn!(
"consumption-api: port {port} still held after {TIMEOUT:?} ({e}); \
continuing — Node-side error handler will manage the retry"
);
return;
}
}
}
}
14 changes: 14 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ use tracing::info;
/// Data directory layout under `{project}/.moose/native_infra/clickhouse/`.
const NATIVE_CH_DIR: &str = "native_infra/clickhouse";

/// `ps -o comm=` value for the ClickHouse watchdog process.
///
/// On Linux the watchdog calls `prctl(PR_SET_NAME, "clckhouse-watch")` at
/// startup — the 'i' is dropped so the name fits the 15-char
/// `TASK_COMM_LEN`. On macOS no such rename happens and `ps` reports
/// `clickhouse` for the spawned binary. Persisting the platform-appropriate
/// string in the PID file lets `process_matches` confirm identity before
/// sending SIGTERM on shutdown on both targets.
#[cfg(target_os = "linux")]
pub const WATCHDOG_COMM: &str = "clckhouse-watch";

#[cfg(not(target_os = "linux"))]
pub const WATCHDOG_COMM: &str = "clickhouse";

/// Ensure the ClickHouse binary is cached and return its path.
pub fn ensure_binary(manager: &BinaryManager) -> Result<PathBuf, NativeInfraError> {
let (url, archive_path, expected_sha256) = clickhouse_download_metadata();
Expand Down
6 changes: 6 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ pub enum NativeInfraError {

#[error("health check failed for {service}: {reason}")]
HealthCheck { service: String, reason: String },

#[error("{0}")]
PortConflict(#[from] super::preflight::PortConflictError),

#[error(transparent)]
InvalidPort(#[from] super::preflight::InvalidPortError),
}
Loading
Loading