Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions payjoin-cli/src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ fn handle_subcommands(config: Builder, cli: &Cli) -> Result<Builder, ConfigError
Commands::History => Ok(config),
#[cfg(feature = "v2")]
Commands::Fallback { .. } => Ok(config),
#[cfg(feature = "v2")]
Commands::Cancel { .. } => Ok(config),
#[cfg(feature = "v2")]
Commands::CancelWithoutBroadcast { .. } => Ok(config),
}
}

Expand Down
4 changes: 4 additions & 0 deletions payjoin-cli/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub trait App: Send + Sync {
async fn history(&self) -> Result<()>;
#[cfg(feature = "v2")]
async fn fallback_sender(&self, session_id: SessionId) -> Result<()>;
#[cfg(feature = "v2")]
async fn cancel_receiver(&self, session_id: SessionId) -> Result<()>;
#[cfg(feature = "v2")]
async fn cancel_receiver_without_broadcast(&self, session_id: SessionId) -> Result<()>;

fn create_original_psbt(
&self,
Expand Down
13 changes: 13 additions & 0 deletions payjoin-cli/src/app/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ impl AppTrait for App {
async fn fallback_sender(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
anyhow::bail!("fallback is only supported for v2 (BIP77) sessions")
}

#[cfg(feature = "v2")]
async fn cancel_receiver(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
anyhow::bail!("receiver cancellation is only supported for v2 (BIP77) sessions")
}

#[cfg(feature = "v2")]
async fn cancel_receiver_without_broadcast(
&self,
_session_id: crate::db::v2::SessionId,
) -> Result<()> {
anyhow::bail!("receiver cancellation is only supported for v2 (BIP77) sessions")
}
}

impl App {
Expand Down
167 changes: 156 additions & 11 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::fmt;
use std::io::{self, Write};
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Context, Result};
use payjoin::bitcoin::consensus::encode::serialize_hex;
use payjoin::bitcoin::{Amount, FeeRate};
use payjoin::bitcoin::{Amount, FeeRate, Transaction};
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
use payjoin::receive::v2::{
replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal, PendingFallback,
ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
WantsOutputs,
Expand Down Expand Up @@ -77,6 +78,7 @@ impl StatusText for ReceiveSession {
ReceiveSession::HasReplyableError(_) =>
"Session failure, waiting to post error response",
ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
ReceiveSession::PendingFallback(_) => "Original transaction awaiting fallback decision",
ReceiveSession::Closed(session_outcome) => match session_outcome {
ReceiverSessionOutcome::Failure => "Session failure",
ReceiverSessionOutcome::Success(_) => "Session success, Payjoin proposal was broadcasted",
Expand Down Expand Up @@ -109,6 +111,12 @@ impl Role {
}
}

#[derive(Clone, Copy)]
enum FallbackHandling {
Prompt,
CloseWithoutBroadcast,
}

struct SessionHistoryRow<Status> {
session_id: SessionId,
role: Role,
Expand Down Expand Up @@ -315,11 +323,22 @@ impl AppTrait for App {
let self_clone = self.clone();
let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
match replay_receiver_event_log(&recv_persister) {
Ok((receiver_state, _)) => {
tasks.push(tokio::spawn(async move {
self_clone.process_receiver_session(receiver_state, &recv_persister).await
}));
Ok((ReceiveSession::PendingFallback(pending_fallback), _)) => {
if let Err(e) = self.complete_pending_fallback(
pending_fallback,
&recv_persister,
FallbackHandling::Prompt,
) {
tracing::error!(
"An error {:?} occurred while handling receiver session {}",
e,
session_id
);
}
}
Ok((receiver_state, _)) => tasks.push(tokio::spawn(async move {
self_clone.process_receiver_session(receiver_state, &recv_persister).await
})),
Err(e) => {
tracing::error!("An error {:?} occurred while replaying receiver session", e);
Self::close_failed_session(&recv_persister, &session_id, "receiver");
Expand Down Expand Up @@ -509,6 +528,15 @@ impl AppTrait for App {
}
Ok(())
}

async fn cancel_receiver(&self, session_id: SessionId) -> Result<()> {
self.cancel_receiver_with_handling(session_id, FallbackHandling::Prompt).await
}

async fn cancel_receiver_without_broadcast(&self, session_id: SessionId) -> Result<()> {
self.cancel_receiver_with_handling(session_id, FallbackHandling::CloseWithoutBroadcast)
.await
}
}

impl App {
Expand All @@ -523,6 +551,95 @@ impl App {
}
}

async fn cancel_receiver_with_handling(
&self,
session_id: SessionId,
handling: FallbackHandling,
) -> Result<()> {
let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
let (session, _) = replay_receiver_event_log(&persister)?;

macro_rules! cancel_to_pending_fallback {
($state:expr) => {{
let pending_fallback = $state.cancel().save(&persister)?;
self.complete_pending_fallback(pending_fallback, &persister, handling)
}};
}

match session {
ReceiveSession::Initialized(state) => {
state.cancel().save(&persister)?;
println!("Receiver session {session_id} cancelled.");
Ok(())
}
ReceiveSession::UncheckedOriginalPayload(state) => {
state.cancel().save(&persister)?;
println!("Receiver session {session_id} cancelled.");
Ok(())
}
ReceiveSession::MaybeInputsOwned(state) => cancel_to_pending_fallback!(state),
ReceiveSession::MaybeInputsSeen(state) => cancel_to_pending_fallback!(state),
ReceiveSession::OutputsUnknown(state) => cancel_to_pending_fallback!(state),
ReceiveSession::WantsOutputs(state) => cancel_to_pending_fallback!(state),
ReceiveSession::WantsInputs(state) => cancel_to_pending_fallback!(state),
ReceiveSession::WantsFeeRange(state) => cancel_to_pending_fallback!(state),
ReceiveSession::ProvisionalProposal(state) => cancel_to_pending_fallback!(state),
ReceiveSession::PayjoinProposal(state) => cancel_to_pending_fallback!(state),
ReceiveSession::Monitor(state) => cancel_to_pending_fallback!(state),
ReceiveSession::HasReplyableError(state) => match state.cancel().save(&persister)? {
Some(pending_fallback) =>
self.complete_pending_fallback(pending_fallback, &persister, handling),
None => {
println!("Receiver session {session_id} cancelled.");
Ok(())
}
},
ReceiveSession::PendingFallback(pending_fallback) =>
self.complete_pending_fallback(pending_fallback, &persister, handling),
ReceiveSession::Closed(outcome) => {
println!("Receiver session {session_id} is already closed: {outcome:?}");
Ok(())
}
}
}

fn complete_pending_fallback(
&self,
pending_fallback: Receiver<PendingFallback>,
persister: &ReceiverPersister,
handling: FallbackHandling,
) -> Result<()> {
let should_broadcast = match handling {
FallbackHandling::Prompt =>
self.prompt_broadcast_fallback(pending_fallback.fallback_tx())?,
FallbackHandling::CloseWithoutBroadcast => false,
};

if should_broadcast {
let txid = self.wallet().broadcast_tx(pending_fallback.fallback_tx())?;
println!("Broadcasted fallback transaction txid: {txid}");
} else {
println!("Closing receiver session without broadcasting the fallback transaction.");
}

pending_fallback.close().save(persister)?;
Ok(())
}

fn prompt_broadcast_fallback(&self, fallback_tx: &Transaction) -> Result<bool> {
println!(
"Original transaction is pending fallback handling. TXID: {}",
fallback_tx.compute_txid()
);
print!("Broadcast the original transaction before closing? [Y/n]: ");
io::stdout().flush()?;

let mut answer = String::new();
io::stdin().read_line(&mut answer)?;
let answer = answer.trim().to_ascii_lowercase();
Ok(!matches!(answer.as_str(), "n" | "no" | "c" | "close"))
}

async fn process_sender_session(
&self,
session: SendSession,
Expand Down Expand Up @@ -656,6 +773,14 @@ impl App {
self.handle_error(error, persister).await,
ReceiveSession::Monitor(proposal) =>
self.monitor_payjoin_proposal(proposal, persister).await,
ReceiveSession::PendingFallback(pending_fallback) => {
self.complete_pending_fallback(
pending_fallback,
persister,
FallbackHandling::Prompt,
)?;
Ok(())
}
ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
}
};
Expand Down Expand Up @@ -807,7 +932,23 @@ impl App {
.map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
let res = self.post_request(req).await?;
let payjoin_psbt = proposal.psbt().clone();
let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
let session =
match proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister) {
Ok(session) => session,
Err(e) => {
let message = e.to_string();
if let Some(pending_fallback) = e.error_state() {
println!("Payjoin proposal post failed: {message}");
self.complete_pending_fallback(
pending_fallback,
persister,
FallbackHandling::Prompt,
)?;
return Ok(());
}
return Err(anyhow!("Failed to process payjoin proposal response: {message}"));
}
};
println!(
"Response successful. Watch mempool for successful Payjoin. TXID: {}",
payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
Expand Down Expand Up @@ -901,11 +1042,15 @@ impl App {
Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
};

if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
return Err(anyhow!("Failed to process error response: {}", e));
match session.process_error_response(&err_bytes, err_ctx).save(persister) {
Ok(Some(pending_fallback)) => self.complete_pending_fallback(
pending_fallback,
persister,
FallbackHandling::Prompt,
),
Ok(None) => Ok(()),
Err(e) => Err(anyhow!("Failed to process error response: {}", e)),
}

Ok(())
}

async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
Expand Down
15 changes: 15 additions & 0 deletions payjoin-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ pub enum Commands {
#[arg(required = true)]
session_id: i64,
},
#[cfg(feature = "v2")]
/// Cancel a receiver session and prompt for fallback handling
Cancel {
/// The receiver session ID to cancel
#[arg(required = true)]
session_id: i64,
},
#[cfg(feature = "v2")]
/// Cancel a receiver session without broadcasting the fallback transaction
#[command(name = "cancel-without-broadcast")]
CancelWithoutBroadcast {
/// The receiver session ID to cancel
#[arg(required = true)]
session_id: i64,
},
}

pub fn parse_amount_in_sat(s: &str) -> Result<Amount, ParseAmountError> {
Expand Down
8 changes: 8 additions & 0 deletions payjoin-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ async fn main() -> Result<()> {
Commands::Fallback { session_id } => {
app.fallback_sender(SessionId(*session_id)).await?;
}
#[cfg(feature = "v2")]
Commands::Cancel { session_id } => {
app.cancel_receiver(SessionId(*session_id)).await?;
}
#[cfg(feature = "v2")]
Commands::CancelWithoutBroadcast { session_id } => {
app.cancel_receiver_without_broadcast(SessionId(*session_id)).await?;
}
};

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions payjoin-ffi/javascript/wasm-manifest-patch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ features = ["wasm-unstable-single-threaded"]
payjoin = { path = "../../../../payjoin" }
payjoin-mailroom = { path = "../../../../payjoin-mailroom" }
payjoin-test-utils = { path = "../../../../payjoin-test-utils" }

[patch."https://github.com/payjoin/rust-payjoin.git"]
payjoin = { path = "../../../../payjoin" }
Loading
Loading