Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Local cross-compile config for dec-bench harness tests.
# CI uses manylinux_2_28 directly; this is the convenience path for
# `cross build --target aarch64-unknown-linux-gnu -p moose-cli`.
[build.env]
# Tell openssl-sys to link the system OpenSSL 1.1 inside the container
# instead of vendoring OpenSSL 3.5.5. The vendored build strips deprecated
# symbols (SSL_get_peer_certificate), which librdkafka still calls — so the
# link fails. Using the system lib keeps both old and new names available.
passthrough = ["OPENSSL_LIB_DIR", "OPENSSL_INCLUDE_DIR", "OPENSSL_STATIC"]
Comment thread
callicles marked this conversation as resolved.
Outdated

[target.aarch64-unknown-linux-gnu]
# The pinned 0.2.5 image ships libssl 1.0.0, which lacks SSL_get_peer_certificate
# as an exported symbol and fails librdkafka's final link. `:main` is Ubuntu
# 20.04 with libssl 1.1.1, which has the symbol.
image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu:main"
Comment thread
callicles marked this conversation as resolved.
# cross's /opt/toolchain.cmake pins CMAKE_FIND_ROOT_PATH to /usr/aarch64-linux-gnu
# (sysroot layout), but apt multiarch installs arm64 libs to
# /usr/lib/aarch64-linux-gnu (Debian layout). rdkafka-sys's cmake build of
# librdkafka does FIND_PACKAGE(ZLIB) / FIND_PACKAGE(OpenSSL) and can't see
# across that gap — so we symlink the multiarch lib/headers into the sysroot.
pre-build = [
"dpkg --add-architecture arm64",
"apt-get update",
"apt-get install -y --no-install-recommends protobuf-compiler libprotobuf-dev cmake pkg-config zlib1g-dev:arm64 libssl-dev:arm64",
"mkdir -p /usr/aarch64-linux-gnu/lib /usr/aarch64-linux-gnu/include",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.so /usr/aarch64-linux-gnu/lib/libz.so",
"ln -sf /usr/lib/aarch64-linux-gnu/libz.a /usr/aarch64-linux-gnu/lib/libz.a",
"ln -sf /usr/include/zlib.h /usr/aarch64-linux-gnu/include/zlib.h",
"ln -sf /usr/include/zconf.h /usr/aarch64-linux-gnu/include/zconf.h",
"for f in libssl.so libssl.a libcrypto.so libcrypto.a; do ln -sf /usr/lib/aarch64-linux-gnu/$f /usr/aarch64-linux-gnu/lib/$f; done",
"ln -sfn /usr/include/openssl /usr/aarch64-linux-gnu/include/openssl",
Comment thread
callicles marked this conversation as resolved.
]
2 changes: 1 addition & 1 deletion apps/framework-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ posthog514client-rs = { path = "../../packages/posthog514client-rs" }
# Remove this fork once upstream adds dialect-aware serialization.
sqlparser = { git = "https://github.com/514-labs/datafusion-sqlparser-rs", rev = "f84bd06", features = ["visitor"] }
itertools = "0.13.0"
openssl = { version = "0.10", features = ["vendored"] }
openssl = { version = "0.10" }
clap = { version = "4.3.17", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
thiserror = "1.0.59"
Expand Down
121 changes: 105 additions & 16 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,26 +571,111 @@ async fn get_consumption_api_res(
let _ = execute!(std::io::stdout(), Print(msg + "\n"));
}

let mut client_req = reqwest::Request::new(req.method().clone(), url.parse()?);
// Capture method + headers up front so we can rebuild the reqwest::Request
Comment thread
callicles marked this conversation as resolved.
// across retry attempts (reqwest::Request isn't Clone when building via
// reqwest::Request::new). Only GET requests are proxied here and they
// carry no body, so rebuild-per-attempt is cheap.
let method = req.method().clone();
let hdrs: Vec<(hyper::http::HeaderName, hyper::http::HeaderValue)> = req
.headers()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let url_parsed: reqwest::Url = url.parse()?;

// Retry only on genuine connect errors — these are the hot-reload window
// (consumption-api primary restarting → :proxy_port momentarily closed).
// Other failures (timeouts, TLS errors, 5xx from upstream) surface once.
const MAX_ATTEMPTS: usize = 3;
const BACKOFF_MS: [u64; 2] = [150, 400]; // applied between attempts 1→2 and 2→3

let mut last_connect_err: Option<reqwest::Error> = None;
for attempt in 0..MAX_ATTEMPTS {
let mut client_req = reqwest::Request::new(method.clone(), url_parsed.clone());
let req_headers = client_req.headers_mut();
for (k, v) in hdrs.iter() {
req_headers.insert(k, v.clone());
}

// Copy headers
let headers = client_req.headers_mut();
for (key, value) in req.headers() {
headers.insert(key, value.clone());
match http_client.execute(client_req).await {
Ok(res) => {
let status = res.status();
let body = res.bytes().await?;
return Ok(add_cors_headers(Response::builder())
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Err(e) if is_connect_error(&e) => {
debug!(
"consumption proxy connect error on attempt {}/{}: {}",
attempt + 1,
MAX_ATTEMPTS,
e
);
last_connect_err = Some(e);
if let Some(&ms) = BACKOFF_MS.get(attempt) {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
continue;
}
// No more attempts left.
break;
}
Err(e) => {
// Non-connect error: bail immediately, preserving existing
// behavior (caller turns this into 500 "Error").
return Err(e.into());
}
}
}

// Send request
let res = http_client.execute(client_req).await?;
let status = res.status();
let body = res.bytes().await?;

let returned_response = add_cors_headers(Response::builder())
.status(status)
// All retries exhausted with connect errors — return a structured 503 so
// agents can reason about this as a transient hot-reload window instead
// of an ambiguous 500. Retry-After tells well-behaved clients to try
// again shortly.
let body = serde_json::json!({
"error": "consumption_api_unavailable",
"retryable": true,
"message": format!(
"Consumption API is temporarily unavailable (restarting on hot reload). \
Comment thread
callicles marked this conversation as resolved.
Outdated
Retry shortly. Attempts: {}/{}.",
MAX_ATTEMPTS, MAX_ATTEMPTS
),
"upstream_error": last_connect_err.as_ref().map(|e| e.to_string()),
});
Ok(add_cors_headers(Response::builder())
.status(StatusCode::SERVICE_UNAVAILABLE)
.header("Content-Type", "application/json")
.body(Full::new(body))
.unwrap();
.header("Retry-After", "1")
.body(Full::new(Bytes::from(body.to_string())))?)
}

/// Classify a reqwest error as a pure connect-failure (server not accepting
/// connections) vs anything else. Connect errors are the hot-reload race
/// signature and the only case we want to retry — timeouts / TLS / upstream
/// 5xx are surfaced directly.
fn is_connect_error(e: &reqwest::Error) -> bool {
use std::error::Error as _;

Ok(returned_response)
if e.is_connect() {
return true;
}
// reqwest wraps hyper wraps std::io::Error; walk the source chain.
let mut src: Option<&(dyn std::error::Error + 'static)> = e.source();
while let Some(err) = src {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
use std::io::ErrorKind::*;
if matches!(
io_err.kind(),
ConnectionRefused | ConnectionReset | ConnectionAborted | NotConnected
) {
return true;
}
}
src = err.source();
}
false
}

#[derive(Clone)]
Expand Down Expand Up @@ -2698,7 +2783,11 @@ impl Webserver {
.await
.unwrap_or_else(|e| handle_listener_err(management_socket.port(), e));

// Check if proxy port is available
// Defense in depth: quick bind-and-drop on proxy_port so Docker-mode
// users get a clear error before the Node worker spawns. The
// --dockerless path also runs a structured preflight in
// NativeInfraProvider::start that covers this port alongside the
// embedded-infra ports.
let proxy_socket = self.get_socket(project.http_server_config.proxy_port).await;
TcpListener::bind(proxy_socket)
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::net::TcpListener;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};

use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::cli::logger::{context, resource_type};
use crate::utilities::system::{RestartPolicy, RestartingProcess, StartChildFn};
Expand Down Expand Up @@ -78,6 +81,20 @@ impl ConsumptionProcessRegistry {
let jwt_config = self.jwt_config.clone();
let proxy_port = self.proxy_port;

// Hot-reload race mitigation: when ConsumptionApiWebServer::Updated
// fires, we call stop() then start() back-to-back. The prior Node
// primary's workers drain connections asynchronously, so :proxy_port
// may still be held by the kernel when the new primary tries to
// fork workers — every new worker's listen() then fails EADDRINUSE
// and the cluster enters a retry storm.
//
// Here we wait until we can successfully bind the port ourselves
// (releasing it immediately after). That proves the old listener is
// gone, so the next process's listen() will succeed cleanly.
if let Some(port) = proxy_port {
wait_for_port_free("127.0.0.1", port);
}

let start_child: StartChildFn<ConsumptionError> = match self.language {
SupportedLanguages::Python => Box::new(move || {
python::consumption::run(
Expand Down Expand Up @@ -132,3 +149,33 @@ impl ConsumptionProcessRegistry {
Ok(())
}
}

/// Block (briefly) until `host:port` is bindable, i.e. any prior listener
/// has fully released it. Polls every 100ms up to 5s. Non-fatal: on timeout
/// we log a warning and return so the caller can still attempt the spawn —
/// the Node-side `server.on('error')` handler will catch a lingering
/// EADDRINUSE and the cluster's restart path will recover.
fn wait_for_port_free(host: &str, port: u16) {
const INTERVAL: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_secs(5);

let deadline = Instant::now() + TIMEOUT;
loop {
// A successful bind here proves the port is free. The TcpListener is
// dropped at end of this scope, releasing the port for the real
// owner to grab a few ms later. Using SO_REUSEADDR (Rust default on
// Linux) means the subsequent bind by the Node worker won't be
// blocked by our brief hold.
Comment thread
callicles marked this conversation as resolved.
Outdated
match TcpListener::bind((host, port)) {
Comment thread
callicles marked this conversation as resolved.
Ok(_) => return,
Err(_) if Instant::now() < deadline => thread::sleep(INTERVAL),
Err(e) => {
warn!(
"consumption-api: port {port} still held after {TIMEOUT:?} ({e}); \
continuing — Node-side error handler will manage the retry"
);
return;
}
}
}
}
3 changes: 3 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,7 @@ pub enum NativeInfraError {

#[error("health check failed for {service}: {reason}")]
HealthCheck { service: String, reason: String },

#[error("{0}")]
PortConflict(#[from] super::preflight::PortConflictError),
}
14 changes: 14 additions & 0 deletions apps/framework-cli/src/utilities/native_infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod clickhouse;
pub mod devkafka;
pub mod devredis;
pub mod errors;
pub mod preflight;
pub mod temporal;

use crate::cli::display::{with_spinner_completion, with_timing, Message};
Expand Down Expand Up @@ -126,6 +127,19 @@ impl InfraProvider for NativeInfraProvider {
}

fn start(&self, project: &Project) -> Result<(), RoutineFailure> {
// Preflight: surface EADDRINUSE in a single actionable message before
// anything starts. Prevents the Node consumption worker from entering
// an unbounded restart loop when a prior `moose dev --dockerless` is
// still holding ports 4001 / 6379 / 19092.
let specs = preflight::port_specs_for(
project,
self.scripts_enabled,
/* include_webserver = */ true,
Comment thread
callicles marked this conversation as resolved.
Outdated
);
preflight::check_ports(&specs, &preflight::native_dir_for(project))
.map_err(NativeInfraError::from)
.map_err(Self::map_native_err)?;

// Start embedded devredis (Redis needed early for leadership/presence)
let devredis_handle = with_timing("Start devredis", || {
with_spinner_completion(
Expand Down
Loading
Loading