Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions auraed/src/cells/cell_service/cell_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use proto::{
},
observe::LogChannelType,
};
use std::os::unix::fs::MetadataExt;
use std::time::Duration;
use std::{process::ExitStatus, sync::Arc};
use std::{os::unix::fs::MetadataExt, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tonic::{Code, Request, Response, Status};
use tracing::{info, instrument, trace, warn};
Expand Down Expand Up @@ -223,8 +221,7 @@ impl CellService {
// Retrieve the process ID (PID) of the started executable
let pid = executable
.pid()
.map_err(CellsServiceError::Io)?
.expect("pid")
.expect("started executable has captured pid")
.as_raw();

// Register the stdout log channel for the executable's PID
Expand Down Expand Up @@ -290,28 +287,24 @@ impl CellService {
assert!(cell_name.is_none());
info!("CellService: stop() executable_name={:?}", executable_name,);

let pid = {
let (pid, stop_result) = {
let mut executables = self.executables.lock().await;

// Retrieve the process ID (PID) of the executable to be stopped
// pid is captured at spawn (see Executable::start), so it is
// available for cache entries regardless of whether Tokio has
// already reaped the leader.
let pid = executables
.get(&executable_name)
.map_err(CellsServiceError::ExecutablesError)?
.pid()
.map_err(CellsServiceError::Io)?
.expect("pid")
.expect("started executable has captured pid")
.as_raw();

// Stop the executable and handle any errors
let _: ExitStatus = executables
.stop(&executable_name)
.await
.map_err(CellsServiceError::ExecutablesError)?;
let result = executables.stop(&executable_name).await;

pid
(pid, result)
};

// Remove the executable's logs from the observe service.
if let Err(e) = self
.observe_service
.unregister_sub_process_channel(pid, LogChannelType::Stdout)
Expand All @@ -327,7 +320,18 @@ impl CellService {
warn!("failed to unregister stderr channel for pid {pid}: {e}");
}

Ok(Response::new(CellServiceStopResponse::default()))
use super::executables::ExecutablesError;
match stop_result {
Ok(_)
| Err(ExecutablesError::ExecutableNotFound { .. })
| Err(ExecutablesError::ExecutableAlreadyExited { .. }) => {
Ok(Response::new(CellServiceStopResponse::default()))
}
Err(e) => Err(Status::internal(format!(
"executable '{}' failed to stop: {}",
executable_name, e
))),
}
}

#[tracing::instrument(skip(self))]
Expand Down
3 changes: 2 additions & 1 deletion auraed/src/cells/cell_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl From<CellsServiceError> for Status {
ExecutablesError::ExecutableExists { .. } => {
Status::already_exists(msg)
}
ExecutablesError::ExecutableNotFound { .. } => {
ExecutablesError::ExecutableNotFound { .. }
| ExecutablesError::ExecutableAlreadyExited { .. } => {
Status::not_found(msg)
}
ExecutablesError::FailedToStartExecutable { .. }
Expand Down
2 changes: 2 additions & 0 deletions auraed/src/cells/cell_service/executables/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub enum ExecutablesError {
ExecutableExists { executable_name: ExecutableName },
#[error("executable '{executable_name}' not found")]
ExecutableNotFound { executable_name: ExecutableName },
#[error("executable '{executable_name}' had already exited before stop")]
ExecutableAlreadyExited { executable_name: ExecutableName },
#[error("executable '{executable_name}' failed to start: {source}")]
FailedToStartExecutable {
executable_name: ExecutableName,
Expand Down
39 changes: 28 additions & 11 deletions auraed/src/cells/cell_service/executables/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
\* -------------------------------------------------------------------------- */
use super::{ExecutableName, ExecutableSpec};
use crate::logging::log_channel::LogChannel;
use nix::sys::signal::{Signal, killpg};
use nix::unistd::Pid;
use std::{
ffi::OsString,
Expand All @@ -34,6 +35,9 @@ pub struct Executable {
pub stdout: LogChannel,
pub stderr: LogChannel,
state: ExecutableState,
// Captured at spawn so killpg targets the right group even after Tokio
// internally reaps the leader (which clears `Child::id()`).
pid: Option<Pid>,
}

#[derive(Debug)]
Expand All @@ -59,7 +63,7 @@ impl Executable {
let state = ExecutableState::Init { command };
let stdout = LogChannel::new(format!("{name}::stdout"));
let stderr = LogChannel::new(format!("{name}::stderr"));
Self { name, description, stdout, stderr, state }
Self { name, description, stdout, stderr, state, pid: None }
}

/// Starts the underlying process.
Expand All @@ -77,14 +81,17 @@ impl Executable {
.kill_on_drop(true)
.current_dir("/")
.stdout(Stdio::piped())
.stderr(Stdio::piped());
.stderr(Stdio::piped())
.process_group(0);

if let Some(uid) = uid {
command = command.uid(uid);
}
if let Some(gid) = gid {
command = command.gid(gid);
}
let mut child = command.spawn()?;
self.pid = child.id().map(|id| Pid::from_raw(id as i32));

let log_channel = self.stdout.clone();
let stdout = child.stdout.take().expect("stdout");
Expand Down Expand Up @@ -140,26 +147,36 @@ impl Executable {
/// Stops the executable and returns the [ExitStatus].
/// If the executable has never been started, returns [None].
pub async fn kill(&mut self) -> io::Result<Option<ExitStatus>> {
// Pid is captured at spawn (Pid is Copy), so we can use it even
// after Tokio internally reaps the leader.
let captured_pid = self.pid;
Ok(match &mut self.state {
ExecutableState::Init { .. } => None,
ExecutableState::Started { child, stdout, stderr, .. } => {
child.kill().await?;
// killpg the whole group (PGID == child PID via
// process_group(0)); child.kill() would only signal the
// leader and orphan grandchildren that joined the group.
let killpg_result = killpg(
captured_pid.expect("started exe has captured pid"),
Signal::SIGKILL,
)
.map_err(io::Error::from);
// Always reap and join the reader tasks, even if killpg
// failed — otherwise the Child stays un-awaited and the
// stdout/stderr handles leak until their pipes close.
let exit_status = child.wait().await?;
let _ = tokio::join!(stdout, stderr);
self.state = ExecutableState::Stopped(exit_status);
killpg_result?;
Some(exit_status)
}
ExecutableState::Stopped(status) => Some(*status),
})
}

/// Returns the [Pid] while [Executable] is running, otherwise returns [None].
pub fn pid(&self) -> io::Result<Option<Pid>> {
let ExecutableState::Started { child: process, .. } = &self.state
else {
return Ok(None);
};

Ok(process.id().map(|id| Pid::from_raw(id as i32)))
/// Returns the captured [Pid] for executables that have been started.
/// Returns [None] before [Executable::start] has been called.
pub fn pid(&self) -> Option<Pid> {
self.pid
}
}
145 changes: 119 additions & 26 deletions auraed/src/cells/cell_service/executables/executables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
use super::{
Executable, ExecutableName, ExecutableSpec, ExecutablesError, Result,
};
use nix::libc;
use std::{collections::HashMap, process::ExitStatus};
use tracing::warn;

type Cache = HashMap<ExecutableName, Executable>;

Expand Down Expand Up @@ -82,37 +84,53 @@ impl Executables {
});
};

let exit_status = executable.kill().await.map_err(|e| {
ExecutablesError::FailedToStopExecutable {
executable_name: executable_name.clone(),
source: e,
match executable.kill().await {
Ok(Some(status)) => {
let _ = self.cache.remove(executable_name);
Ok(status)
}
})?;

let Some(exit_status) = exit_status else {
// Exes that never started return None
let executable =
self.cache.remove(executable_name).expect("exe in cache");
return Err(ExecutablesError::ExecutableNotFound {
executable_name: executable.name,
});
};

let _ = self.cache.remove(executable_name).ok_or_else(|| {
// get_mut would have already thrown this error, so we should never reach here
ExecutablesError::ExecutableNotFound {
executable_name: executable_name.clone(),
Ok(None) => {
// Cache invariant: only started executables are inserted
// into the cache (see `start` above), so kill() on a cached
// entry cannot return Ok(None).
unreachable!(
"executable {executable_name:?} is in cache without \
having been started"
);
}
})?;

Ok(exit_status)
Err(e)
if matches!(
e.raw_os_error(),
Some(libc::ESRCH) | Some(libc::ECHILD)
) =>
{
// killpg ESRCH (group already empty) or wait ECHILD (kernel
// already reaped). Process is gone; evict and report
// distinctly so callers can render stop idempotent without
// collapsing this with "name not in cache".
warn!(
"executable {executable_name:?} already exited before \
stop: {e}"
);
let _ = self.cache.remove(executable_name);
Err(ExecutablesError::ExecutableAlreadyExited {
executable_name: executable_name.clone(),
})
}
Err(e) => Err(ExecutablesError::FailedToStopExecutable {
executable_name: executable_name.clone(),
source: e,
}),
}
}

/// Stops all executables concurrently
pub async fn broadcast_stop(&mut self) {
let mut names = vec![];
for exe in self.cache.values_mut() {
let _ = exe.kill().await;
if let Err(e) = exe.kill().await {
warn!("broadcast_stop: failed to kill {:?}: {e}", exe.name);
}
names.push(exe.name.clone())
}

Expand All @@ -129,9 +147,16 @@ mod tests {
use tokio::process::Command;

fn spec_for(name: &ExecutableName) -> ExecutableSpec {
spec_with_command(name, "sleep 60")
}

fn spec_with_command(
name: &ExecutableName,
sh_arg: &str,
) -> ExecutableSpec {
let mut command = Command::new("sh");
let _ = command.arg("-c");
let _ = command.arg("sleep 60");
let _ = command.arg(sh_arg);
ExecutableSpec {
name: name.clone(),
description: format!("test executable {name}"),
Expand All @@ -150,8 +175,10 @@ mod tests {
let executable = executables
.start(spec_for(&exe_name), None, None)
.expect("start executable");
let pid = executable.pid().expect("read pid");
assert!(pid.is_some(), "expected spawned process to expose a pid");
assert!(
executable.pid().is_some(),
"expected spawned process to expose a pid"
);

let err = executables
.start(spec_for(&exe_name), None, None)
Expand All @@ -168,4 +195,70 @@ mod tests {
"expected graceful stop or SIGKILL, got status {status:?}"
);
}

/// Stopping a short-lived executable that has already finished running
/// must still return Ok (the cache holds the Stopped state) and must
/// evict the cache entry.
#[tokio::test]
async fn stop_after_natural_exit_returns_ok_and_evicts() {
let mut executables = Executables::default();
let exe_name = ExecutableName::new(format!(
"unit-test-self-exit-{}",
uuid::Uuid::new_v4()
));

let pid = executables
.start(spec_with_command(&exe_name, "true"), None, None)
.expect("start executable")
.pid()
.expect("captured pid")
.as_raw();

// Give the leader time to exit. It will sit as a zombie until
// child.wait() is called inside stop(); we just need to ensure the
// process has actually finished its work before we test stop().
let deadline =
std::time::Instant::now() + std::time::Duration::from_secs(5);
while std::fs::metadata(format!("/proc/{pid}/cmdline"))
.map(|_| true)
.unwrap_or(false)
&& std::time::Instant::now() < deadline
{
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}

let _ = executables
.stop(&exe_name)
.await
.expect("stop after natural exit should be Ok");

// Cache must have been evicted, so a second stop reports
// ExecutableNotFound (the cache-miss variant), distinct from
// ExecutableAlreadyExited.
let err = executables
.stop(&exe_name)
.await
.expect_err("second stop should report ExecutableNotFound");
assert!(
matches!(err, ExecutablesError::ExecutableNotFound { .. }),
"expected ExecutableNotFound after eviction, got {err:?}"
);
}

/// Stopping a name that was never inserted must return ExecutableNotFound,
/// not the already-exited variant.
#[tokio::test]
async fn stop_unknown_name_returns_not_found() {
let mut executables = Executables::default();
let exe_name = ExecutableName::new("never-started".to_string());

let err = executables
.stop(&exe_name)
.await
.expect_err("stop on unknown name should fail");
assert!(
matches!(err, ExecutablesError::ExecutableNotFound { .. }),
"expected ExecutableNotFound, got {err:?}"
);
}
}
5 changes: 0 additions & 5 deletions auraed/src/vms/virtual_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{collections::HashMap, net::Ipv4Addr};

use anyhow::anyhow;
use net_util::MacAddr;
use nix::libc;
use tracing::error;
use vmm_sys_util::{rand, signal::block_signal};

Expand All @@ -39,10 +38,6 @@ impl Default for VirtualMachines {
impl VirtualMachines {
/// Create a new instance of the virtual machines cache.
pub fn new() -> Self {
unsafe {
let _ = libc::signal(libc::SIGCHLD, libc::SIG_IGN);
}

// Mask the signals handled by the Cloud Hyupervisor VMM so they only run on the dedicated signal handling thread
for sig in &vmm::vm::Vm::HANDLED_SIGNALS {
if let Err(e) = block_signal(*sig) {
Expand Down
Loading