diff --git a/src/fetch.rs b/src/fetch.rs index 3cc50f4..379617a 100644 --- a/src/fetch.rs +++ b/src/fetch.rs @@ -15,6 +15,7 @@ use tokio::time::sleep; use crate::server::{Arguments, Network}; +#[derive(Clone)] pub struct Client { client: reqwest::Client, use_esplora: bool, diff --git a/src/main.rs b/src/main.rs index da2884b..a115c53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use clap::Parser; use env_logger::Env; use std::io::Write; +use tokio::sync::watch::{self, Receiver}; use waterfalls::server::{inner_main, Arguments}; #[tokio::main(flavor = "current_thread")] @@ -8,15 +9,21 @@ async fn main() { init_logging(); let args = Arguments::parse(); + let shutdown_signal = shutdown_signal().await; - inner_main(args, shutdown_signal()).await.unwrap(); // we want to panic in case of error so that the process exit with non-zero value + inner_main(args, shutdown_signal).await.unwrap(); // we want to panic in case of error so that the process exit with non-zero value } -async fn shutdown_signal() { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); +async fn shutdown_signal() -> Receiver<()> { + let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(()); + tokio::spawn(async move { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + let _ = shutdown_sender.send(()); + }); + shutdown_receiver } fn init_logging() { diff --git a/src/server/mod.rs b/src/server/mod.rs index 627c74f..095d7ed 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,6 +1,5 @@ //! Inside this module everything is needed to run the service providing the waterfalls protocol -use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; @@ -18,6 +17,7 @@ use hyper::service::service_fn; use hyper_util::rt::TokioIo; use route::infallible_route; use tokio::net::TcpListener; +use tokio::sync::watch::Receiver; use tokio::sync::Mutex; pub mod encryption; @@ -219,7 +219,7 @@ fn get_store(args: &Arguments) -> Result { pub async fn inner_main( args: Arguments, - shutdown_signal: impl Future, + shutdown_signal: Receiver<()>, ) -> Result<(), Box> { args.is_valid()?; @@ -254,7 +254,8 @@ pub async fn inner_main( let _h1 = { let state = state.clone(); let client: Client = Client::new(&args); - tokio::spawn(async move { blocks_infallible(state, client).await }) + let shutdown_signal = shutdown_signal.clone(); + tokio::spawn(async move { blocks_infallible(state, client, shutdown_signal).await }) }; let _h2 = { @@ -272,9 +273,10 @@ pub async fn inner_main( let listener = TcpListener::bind(addr).await?; let client = Client::new(&args); let client = Arc::new(Mutex::new(client)); - let mut signal = std::pin::pin!(shutdown_signal); loop { + let mut signal = shutdown_signal.clone(); + tokio::select! { Ok( (stream, _)) = listener.accept() => { let io = TokioIo::new(stream); @@ -295,7 +297,7 @@ pub async fn inner_main( }); }, - _ = &mut signal => { + _ = signal.changed() => { log::info!("graceful shutdown signal received"); // stop the accept loop break; diff --git a/src/store/db.rs b/src/store/db.rs index a166f1e..024dd2b 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -111,7 +111,7 @@ impl DBStore { batch.put_cf(&cf, &key_buf, val); } - self.db.write(batch)?; + self.db.write(batch).context("Error writing to db")?; Ok(()) } @@ -138,7 +138,7 @@ impl DBStore { batch.delete_cf(&cf, &key.1); } - self.db.write(batch)?; + self.db.write(batch).context("Error writing to db")?; Ok(result) } @@ -158,7 +158,7 @@ impl DBStore { for (script_hash, new_heights) in add { batch.merge_cf(&cf, script_hash.to_be_bytes(), to_be_bytes(new_heights)) } - self.db.write(batch)?; + self.db.write(batch).context("Error writing to db")?; Ok(()) } } @@ -245,15 +245,19 @@ impl Store for DBStore { let mut history_map = history_map; // TODO should be a db tx let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.0).collect(); - let script_hashes = self.remove_utxos(&only_outpoints)?; + let script_hashes = self + .remove_utxos(&only_outpoints) + .context("Error removing utxos")?; for (script_hash, (_, txid)) in script_hashes.into_iter().zip(utxo_spent) { let el = history_map.entry(script_hash).or_default(); el.push(TxSeen::new(txid, block_meta.height())); } self.set_hash_ts(block_meta); - self.update_history(&history_map)?; - self.insert_utxos(&utxo_created)?; + self.update_history(&history_map) + .context("Error updating history")?; + self.insert_utxos(&utxo_created) + .context("Error inserting utxos")?; Ok(()) } diff --git a/src/test_env.rs b/src/test_env.rs index 2d071bb..5334bb3 100644 --- a/src/test_env.rs +++ b/src/test_env.rs @@ -25,7 +25,7 @@ use elements::{ use hyper::HeaderMap; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::oneshot::{self, Receiver, Sender}; +use tokio::sync::watch::{self, Sender}; pub struct TestEnv { #[allow(dead_code)] @@ -89,8 +89,8 @@ async fn inner_launch_with_node(elementsd: BitcoinD, path: Option) -> T } } - let (tx, rx) = oneshot::channel(); - let handle = tokio::spawn(inner_main(args, shutdown_signal(rx))); + let (tx, rx) = watch::channel::<()>(()); + let handle = tokio::spawn(inner_main(args, rx)); let client = WaterfallClient::new(base_url.to_string()); let secp = Secp256k1::new(); @@ -257,10 +257,6 @@ impl TestEnv { } } -async fn shutdown_signal(rx: Receiver<()>) { - rx.await.unwrap() -} - #[derive(Serialize, Deserialize)] pub struct Input { pub txid: String, diff --git a/src/threads/blocks.rs b/src/threads/blocks.rs index 80c4aee..dfeb931 100644 --- a/src/threads/blocks.rs +++ b/src/threads/blocks.rs @@ -9,12 +9,29 @@ use std::{ collections::{HashMap, HashSet}, str::FromStr, sync::Arc, - time::Instant, + time::{Duration, Instant}, }; +use tokio::{sync::watch::Receiver, time::sleep}; -pub(crate) async fn blocks_infallible(shared_state: Arc, client: Client) { - if let Err(e) = index(shared_state, client).await { - log::error!("{:?}", e); +pub(crate) async fn blocks_infallible( + shared_state: Arc, + client: Client, + shutdown_signal: Receiver<()>, +) { + loop { + let mut signal = shutdown_signal.clone(); + tokio::select! { + _ = signal.changed() => { + log::info!("Shutdown signal received, exiting blocks thread"); + return; + } + res = index(shared_state.clone(), client.clone()) => { + if let Err(e) = res { + log::error!("Error indexing blocks: {:?}", e); + sleep(Duration::from_secs(10)).await + } + } + } } } @@ -79,9 +96,10 @@ pub async fn index(state: Arc, client: Client) -> Result<(), Error> { } let meta = BlockMeta::new(block_height, block.block_hash(), block.header.time); - state.set_hash_ts(&meta).await; db.update(&meta, utxo_spent, history_map, utxo_created) + .inspect_err(|e| log::error!("Error updating store: {e:?}")) .map_err(|_| Error::Other)?; // TODO + state.set_hash_ts(&meta).await; } Ok(()) }