Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::time::sleep;

use crate::server::{Arguments, Network};

#[derive(Clone)]
pub struct Client {
client: reqwest::Client,
use_esplora: bool,
Expand Down
19 changes: 13 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use clap::Parser;
use env_logger::Env;
use std::io::Write;
use tokio::sync::watch::{self, Receiver};
use waterfalls::server::{inner_main, Arguments};

#[tokio::main(flavor = "current_thread")]
async fn main() {
init_logging();

let args = Arguments::parse();
let shutdown_signal = shutdown_signal().await;

inner_main(args, shutdown_signal()).await.unwrap(); // we want to panic in case of error so that the process exit with non-zero value
inner_main(args, shutdown_signal).await.unwrap(); // we want to panic in case of error so that the process exit with non-zero value
}

async fn shutdown_signal() {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
async fn shutdown_signal() -> Receiver<()> {
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
tokio::spawn(async move {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
let _ = shutdown_sender.send(());
});
shutdown_receiver
}

fn init_logging() {
Expand Down
12 changes: 7 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Inside this module everything is needed to run the service providing the waterfalls protocol

use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;

Expand All @@ -18,6 +17,7 @@ use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use route::infallible_route;
use tokio::net::TcpListener;
use tokio::sync::watch::Receiver;
use tokio::sync::Mutex;

pub mod encryption;
Expand Down Expand Up @@ -219,7 +219,7 @@ fn get_store(args: &Arguments) -> Result<AnyStore, Error> {

pub async fn inner_main(
args: Arguments,
shutdown_signal: impl Future<Output = ()>,
shutdown_signal: Receiver<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
args.is_valid()?;

Expand Down Expand Up @@ -254,7 +254,8 @@ pub async fn inner_main(
let _h1 = {
let state = state.clone();
let client: Client = Client::new(&args);
tokio::spawn(async move { blocks_infallible(state, client).await })
let shutdown_signal = shutdown_signal.clone();
tokio::spawn(async move { blocks_infallible(state, client, shutdown_signal).await })
};

let _h2 = {
Expand All @@ -272,9 +273,10 @@ pub async fn inner_main(
let listener = TcpListener::bind(addr).await?;
let client = Client::new(&args);
let client = Arc::new(Mutex::new(client));
let mut signal = std::pin::pin!(shutdown_signal);

loop {
let mut signal = shutdown_signal.clone();

tokio::select! {
Ok( (stream, _)) = listener.accept() => {
let io = TokioIo::new(stream);
Expand All @@ -295,7 +297,7 @@ pub async fn inner_main(
});
},

_ = &mut signal => {
_ = signal.changed() => {
log::info!("graceful shutdown signal received");
// stop the accept loop
break;
Expand Down
16 changes: 10 additions & 6 deletions src/store/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl DBStore {
batch.put_cf(&cf, &key_buf, val);
}

self.db.write(batch)?;
self.db.write(batch).context("Error writing to db")?;
Ok(())
}

Expand All @@ -138,7 +138,7 @@ impl DBStore {
batch.delete_cf(&cf, &key.1);
}

self.db.write(batch)?;
self.db.write(batch).context("Error writing to db")?;

Ok(result)
}
Expand All @@ -158,7 +158,7 @@ impl DBStore {
for (script_hash, new_heights) in add {
batch.merge_cf(&cf, script_hash.to_be_bytes(), to_be_bytes(new_heights))
}
self.db.write(batch)?;
self.db.write(batch).context("Error writing to db")?;
Ok(())
}
}
Expand Down Expand Up @@ -245,15 +245,19 @@ impl Store for DBStore {
let mut history_map = history_map;
// TODO should be a db tx
let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.0).collect();
let script_hashes = self.remove_utxos(&only_outpoints)?;
let script_hashes = self
.remove_utxos(&only_outpoints)
.context("Error removing utxos")?;
for (script_hash, (_, txid)) in script_hashes.into_iter().zip(utxo_spent) {
let el = history_map.entry(script_hash).or_default();
el.push(TxSeen::new(txid, block_meta.height()));
}

self.set_hash_ts(block_meta);
self.update_history(&history_map)?;
self.insert_utxos(&utxo_created)?;
self.update_history(&history_map)
.context("Error updating history")?;
self.insert_utxos(&utxo_created)
.context("Error inserting utxos")?;

Ok(())
}
Expand Down
10 changes: 3 additions & 7 deletions src/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use elements::{
use hyper::HeaderMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::watch::{self, Sender};

pub struct TestEnv {
#[allow(dead_code)]
Expand Down Expand Up @@ -89,8 +89,8 @@ async fn inner_launch_with_node(elementsd: BitcoinD, path: Option<PathBuf>) -> T
}
}

let (tx, rx) = oneshot::channel();
let handle = tokio::spawn(inner_main(args, shutdown_signal(rx)));
let (tx, rx) = watch::channel::<()>(());
let handle = tokio::spawn(inner_main(args, rx));

let client = WaterfallClient::new(base_url.to_string());
let secp = Secp256k1::new();
Expand Down Expand Up @@ -257,10 +257,6 @@ impl TestEnv {
}
}

async fn shutdown_signal(rx: Receiver<()>) {
rx.await.unwrap()
}

#[derive(Serialize, Deserialize)]
pub struct Input {
pub txid: String,
Expand Down
28 changes: 23 additions & 5 deletions src/threads/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,29 @@ use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Instant,
time::{Duration, Instant},
};
use tokio::{sync::watch::Receiver, time::sleep};

pub(crate) async fn blocks_infallible(shared_state: Arc<State>, client: Client) {
if let Err(e) = index(shared_state, client).await {
log::error!("{:?}", e);
pub(crate) async fn blocks_infallible(
shared_state: Arc<State>,
client: Client,
shutdown_signal: Receiver<()>,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this also in the mempool thread? (not needed in this MR just wondering)

) {
loop {
let mut signal = shutdown_signal.clone();
tokio::select! {
_ = signal.changed() => {
log::info!("Shutdown signal received, exiting blocks thread");
return;
}
res = index(shared_state.clone(), client.clone()) => {
if let Err(e) = res {
log::error!("Error indexing blocks: {:?}", e);
sleep(Duration::from_secs(10)).await
}
}
}
}
}

Expand Down Expand Up @@ -79,9 +96,10 @@ pub async fn index(state: Arc<State>, client: Client) -> Result<(), Error> {
}

let meta = BlockMeta::new(block_height, block.block_hash(), block.header.time);
state.set_hash_ts(&meta).await;
db.update(&meta, utxo_spent, history_map, utxo_created)
.inspect_err(|e| log::error!("Error updating store: {e:?}"))
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use with_context like in other places?

.map_err(|_| Error::Other)?; // TODO
state.set_hash_ts(&meta).await;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems it's called internally in update() also, should we remove it from here?

}
Ok(())
}
Expand Down