Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions crates/atuin/src/command/client/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::time::{Duration, Instant};
use atuin_client::{
database::Sqlite, history::History, record::sqlite_store::SqliteStore, settings::Settings,
};
use atuin_daemon::client::{DaemonClientErrorKind, HistoryClient, classify_error};
use atuin_daemon::DaemonEvent;
use atuin_daemon::client::{ControlClient, DaemonClientErrorKind, HistoryClient, classify_error};
use clap::Subcommand;
#[cfg(unix)]
use daemonize::Daemonize;
Expand Down Expand Up @@ -343,7 +344,14 @@ fn ensure_autostart_supported(settings: &Settings) -> Result<()> {
Ok(())
}

async fn restart_daemon(settings: &Settings) -> Result<HistoryClient> {
/// Ensure the daemon is running, starting it if necessary.
///
/// If the daemon is already running and up-to-date, this is a no-op.
/// If it is not running or needs a restart, this will spawn a new daemon
/// process and wait for it to become ready.
///
/// Returns an error if the daemon could not be started.
pub async fn ensure_daemon_running(settings: &Settings) -> Result<()> {
ensure_autostart_supported(settings)?;

let timeout = startup_timeout(settings);
Expand All @@ -352,9 +360,9 @@ async fn restart_daemon(settings: &Settings) -> Result<HistoryClient> {
let startup_lock = wait_for_lock(&startup_lock_path, timeout).await?;

match probe(settings).await {
Probe::Ready(client) => {
Probe::Ready(_) => {
drop(startup_lock);
return Ok(client);
return Ok(());
}
Probe::NeedsRestart(_) => {
request_shutdown(settings).await;
Expand All @@ -373,10 +381,15 @@ async fn restart_daemon(settings: &Settings) -> Result<HistoryClient> {
remove_stale_socket_if_present(settings)?;

spawn_daemon_process()?;
let client = wait_until_ready(settings, timeout).await?;
let _ = wait_until_ready(settings, timeout).await?;

drop(startup_lock);
Ok(client)
Ok(())
}

async fn restart_daemon(settings: &Settings) -> Result<HistoryClient> {
ensure_daemon_running(settings).await?;
connect_client(settings).await
}

fn ensure_reply_compatible(settings: &Settings, version: &str, protocol: u32) -> Result<()> {
Expand Down Expand Up @@ -465,6 +478,45 @@ pub async fn end_history(settings: &Settings, id: String, duration: u64, exit: i
Ok(())
}

/// Emit a daemon event, auto-starting the daemon if it is not running.
///
/// If the daemon is not reachable and `daemon.autostart` is enabled, this
/// will start the daemon and retry the event. If the daemon cannot be
/// started or the retry fails, a warning is printed to stderr.
pub async fn emit_event(settings: &Settings, event: DaemonEvent) {
// Try to connect and send
match ControlClient::from_settings(settings).await {
Ok(mut client) => {
if let Err(e) = client.send_event(event).await {
tracing::debug!(?e, "failed to send event to daemon");
}
return;
}
Err(e) if !settings.daemon.autostart || !should_retry_after_error(&e) => {
tracing::debug!(?e, "daemon not available, skipping event emission");
return;
}
Err(_) => {}
}

// Auto-start the daemon and retry
if let Err(e) = ensure_daemon_running(settings).await {
eprintln!("Could not start daemon: {e}");
return;
}

match ControlClient::from_settings(settings).await {
Ok(mut client) => {
if let Err(e) = client.send_event(event).await {
eprintln!("Daemon started but failed to send event: {e}");
}
}
Err(e) => {
eprintln!("Daemon started but failed to connect: {e}");
}
}
}

pub async fn tail_client(settings: &Settings) -> Result<HistoryClient> {
match probe(settings).await {
Probe::Ready(client) => return Ok(client),
Expand Down
12 changes: 6 additions & 6 deletions crates/atuin/src/command/client/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ use clap::Subcommand;
use eyre::{Context, Result, bail};
use runtime_format::{FormatKey, FormatKeyError, ParseSegment, ParsedFmt};

#[cfg(feature = "daemon")]
use super::daemon as daemon_cmd;
#[cfg(feature = "daemon")]
use colored::Colorize;
#[cfg(feature = "daemon")]
use serde::Serialize;

#[cfg(feature = "daemon")]
use atuin_daemon::{
emit_event,
history::{HistoryEventKind, TailHistoryReply},
};
use atuin_daemon::history::{HistoryEventKind, TailHistoryReply};

use atuin_client::{
database::{Database, Sqlite, current_context},
Expand Down Expand Up @@ -989,7 +988,7 @@ impl Cmd {
}

#[cfg(feature = "daemon")]
let _ = emit_event(atuin_daemon::DaemonEvent::HistoryPruned).await;
daemon_cmd::emit_event(settings, atuin_daemon::DaemonEvent::HistoryPruned).await;
}
Ok(())
}
Expand Down Expand Up @@ -1050,7 +1049,8 @@ impl Cmd {
}

#[cfg(feature = "daemon")]
let _ = emit_event(atuin_daemon::DaemonEvent::HistoryDeleted { ids }).await;
daemon_cmd::emit_event(settings, atuin_daemon::DaemonEvent::HistoryDeleted { ids })
.await;
}
Ok(())
}
Expand Down
80 changes: 60 additions & 20 deletions crates/atuin/src/command/client/search/engines/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use atuin_client::{
history::History,
settings::{SearchMode, Settings},
};
use atuin_daemon::client::SearchClient;
use atuin_daemon::client::{DaemonClientErrorKind, SearchClient, classify_error};
use atuin_nucleo_matcher::{
Config, Matcher, Utf32Str,
pattern::{CaseMatching, Normalization, Pattern},
Expand All @@ -14,10 +14,12 @@ use tracing::{Level, debug, instrument, span};
use uuid::Uuid;

use super::{SearchEngine, SearchState};
use crate::command::client::daemon;

pub struct Search {
client: Option<SearchClient>,
query_id: u64,
settings: Settings,
#[cfg(unix)]
socket_path: String,
#[cfg(not(unix))]
Expand All @@ -29,6 +31,7 @@ impl Search {
Search {
client: None,
query_id: 0,
settings: settings.clone(),
#[cfg(unix)]
socket_path: settings.daemon.socket_path.clone(),
#[cfg(not(unix))]
Expand All @@ -39,17 +42,31 @@ impl Search {
#[instrument(skip_all, level = Level::TRACE, name = "get_daemon_client")]
async fn get_client(&mut self) -> Result<&mut SearchClient> {
if self.client.is_none() {
#[cfg(unix)]
let client = SearchClient::new(self.socket_path.clone()).await?;

#[cfg(not(unix))]
let client = SearchClient::new(self.tcp_port).await?;

self.client = Some(client);
self.connect().await?;
}
Ok(self.client.as_mut().unwrap())
}

async fn connect(&mut self) -> Result<()> {
#[cfg(unix)]
let client = SearchClient::new(self.socket_path.clone()).await?;

#[cfg(not(unix))]
let client = SearchClient::new(self.tcp_port).await?;

self.client = Some(client);
Ok(())
}

fn should_retry(err: &eyre::Report) -> bool {
matches!(
classify_error(err),
DaemonClientErrorKind::Connect
| DaemonClientErrorKind::Unavailable
| DaemonClientErrorKind::Unimplemented
)
}

fn next_query_id(&mut self) -> u64 {
self.query_id += 1;
self.query_id
Expand Down Expand Up @@ -115,17 +132,41 @@ impl SearchEngine for Search {
let span =
span!(Level::TRACE, "daemon_search.req_resp", query = %query, query_id = query_id);

let client = self.get_client().await?;

let _span = span.enter();
let mut stream = client
.search(
query.clone(),
query_id,
state.filter_mode,
Some(state.context.clone()),
)
.await?;
// Try to connect and search; if it fails with a retriable error,
// auto-start the daemon and retry once.
let first_attempt = async {
let client = self.get_client().await?;
client
.search(
query.clone(),
query_id,
state.filter_mode,
Some(state.context.clone()),
)
.await
}
.await;

let mut stream = match first_attempt {
Ok(stream) => stream,
Err(err) if self.settings.daemon.autostart && Self::should_retry(&err) => {
debug!("daemon not available, attempting auto-start");
self.client = None;

daemon::ensure_daemon_running(&self.settings).await?;

let client = self.get_client().await?;
client
.search(
query.clone(),
query_id,
state.filter_mode,
Some(state.context.clone()),
)
.await?
}
Err(err) => return Err(err),
};

let mut ids = Vec::with_capacity(200);
span!(Level::TRACE, "daemon_search.resp")
Expand Down Expand Up @@ -155,7 +196,6 @@ impl SearchEngine for Search {
}
})
.await;
drop(_span);
drop(span);

if ids.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions crates/atuin/src/command/client/store/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::Args;
use eyre::{Result, bail};

#[cfg(feature = "daemon")]
use atuin_daemon::emit_event;
use crate::command::client::daemon as daemon_cmd;

use atuin_client::{
database::Database, encryption, history::store::HistoryStore,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl Rebuild {
history_store.build(database).await?;

#[cfg(feature = "daemon")]
let _ = emit_event(atuin_daemon::DaemonEvent::HistoryRebuilt).await;
daemon_cmd::emit_event(settings, atuin_daemon::DaemonEvent::HistoryRebuilt).await;

Ok(())
}
Expand Down