Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion payjoin-cli/src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ fn handle_subcommands(config: Builder, cli: &Cli) -> Result<Builder, ConfigError
#[cfg(feature = "v2")]
Commands::History => Ok(config),
#[cfg(feature = "v2")]
Commands::Fallback { .. } => Ok(config),
Commands::Cancel { .. } => Ok(config),
}
}

Expand Down
2 changes: 1 addition & 1 deletion payjoin-cli/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait App: Send + Sync {
#[cfg(feature = "v2")]
async fn history(&self) -> Result<()>;
#[cfg(feature = "v2")]
async fn fallback_sender(&self, session_id: SessionId) -> Result<()>;
async fn cancel_sender(&self, session_id: SessionId, no_broadcast: bool) -> Result<()>;

fn create_original_psbt(
&self,
Expand Down
8 changes: 6 additions & 2 deletions payjoin-cli/src/app/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ impl AppTrait for App {
}

#[cfg(feature = "v2")]
async fn fallback_sender(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
anyhow::bail!("fallback is only supported for v2 (BIP77) sessions")
async fn cancel_sender(
&self,
_session_id: crate::db::v2::SessionId,
_no_broadcast: bool,
) -> Result<()> {
anyhow::bail!("cancel is only supported for v2 (BIP77) sessions")
}
}

Expand Down
63 changes: 40 additions & 23 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use payjoin::receive::v2::{
WantsOutputs,
};
use payjoin::send::v2::{
replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
replay_event_log as replay_sender_event_log, PendingFallback, PollingForProposal, SendSession,
Sender, SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
};
use payjoin::{ImplementationError, PjParam, Uri};
use tokio::sync::watch;
Expand Down Expand Up @@ -57,6 +57,7 @@ impl StatusText for SendSession {
SenderSessionOutcome::Success(_) => "Session success",
SenderSessionOutcome::Cancel => "Session cancelled",
},
SendSession::PendingFallback(_) => "Session awaiting fallback",
}
}
}
Expand Down Expand Up @@ -256,15 +257,15 @@ impl AppTrait for App {
Ok(()) => return Ok(()),
Err(err) => {
let id = persister.session_id();
println!("Session {id} failed. Run `payjoin-cli fallback {id}` to broadcast the original transaction.");
println!("Session {id} failed. Run `payjoin-cli cancel {id}` to cancel and broadcast the original transaction.");
return Err(err);
}
}
},
_ = interrupt.changed() => {
let id = persister.session_id();
println!(
"Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli fallback {id}` to broadcast the original transaction."
"Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli cancel {id}` to cancel and broadcast the original transaction."
);
return Err(anyhow!("Interrupted"))
}
Expand Down Expand Up @@ -484,29 +485,41 @@ impl AppTrait for App {
Ok(())
}

async fn fallback_sender(&self, session_id: SessionId) -> Result<()> {
async fn cancel_sender(&self, session_id: SessionId, no_broadcast: bool) -> Result<()> {
let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
let (session, history) = replay_sender_event_log(&persister)?;
let (session, _history) = replay_sender_event_log(&persister)?;

if let SendSession::Closed(SenderSessionOutcome::Success(proposal)) = session {
let txid = proposal.clone().extract_tx_unchecked_fee_rate().compute_txid();
let pending: Sender<PendingFallback> = match session {
SendSession::WithReplyKey(sender) => sender.cancel().save(&persister)?,
SendSession::PollingForProposal(sender) => sender.cancel().save(&persister)?,
SendSession::PendingFallback(sender) => sender,
SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
let txid = proposal.extract_tx_unchecked_fee_rate().compute_txid();
println!(
"Session {session_id} already produced payjoin transaction {txid}. \
Cannot cancel a completed session."
);
return Ok(());
}
SendSession::Closed(_) => {
println!("Session {session_id} is already closed. Nothing left to do.");
return Ok(());
}
};

if no_broadcast {
println!(
"Session {session_id} already produced payjoin transaction {txid}. \
Broadcasting the original now would double-spend against it. \
If the payjoin tx needs re-broadcast, run \
`bitcoin-cli gettransaction {txid}` to fetch the hex, then \
`bitcoin-cli sendrawtransaction <hex>`."
"Session {session_id} cancelled. Broadcast the original transaction manually:\n{}",
serialize_hex(pending.fallback_tx())
);
} else {
self.wallet().broadcast_tx(pending.fallback_tx())?;
println!(
"Broadcasted fallback transaction txid: {}",
pending.fallback_tx().compute_txid()
);
return Ok(());
}

let fallback_tx = history.fallback_tx();
self.wallet().broadcast_tx(&fallback_tx)?;
println!("Broadcasted fallback transaction txid: {}", fallback_tx.compute_txid());

if let Err(e) = SessionPersister::close(&persister) {
tracing::warn!("Failed to close session {session_id} after fallback: {e}");
}
pending.close().save(&persister)?;
Ok(())
}
}
Expand Down Expand Up @@ -539,9 +552,13 @@ impl App {
}
SendSession::Closed(SenderSessionOutcome::Failure)
| SendSession::Closed(SenderSessionOutcome::Cancel) => {
println!("Session is closed. Nothing left to do");
return Ok(());
}
SendSession::PendingFallback(_) => {
let id = persister.session_id();
println!(
"Session {id} ended without payjoin. Run `payjoin-cli fallback {id}` to broadcast the original transaction."
"Session {id} was cancelled. Run `payjoin-cli cancel {id}` to cancel and broadcast the original transaction."
);
return Ok(());
}
Expand Down
10 changes: 7 additions & 3 deletions payjoin-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,15 @@ pub enum Commands {
/// Show payjoin session history
History,
#[cfg(feature = "v2")]
/// Broadcast the original transaction for a sender session (BIP77/v2 only)
Fallback {
/// The session ID to broadcast the fallback transaction for
/// Cancel a sender session, broadcasting the fallback transaction by default (BIP77/v2 only)
Cancel {
/// The session ID to cancel
#[arg(required = true)]
session_id: i64,

/// Cancel without broadcasting the fallback transaction
#[arg(long = "no-broadcast")]
no_broadcast: bool,
},
}

Expand Down
4 changes: 2 additions & 2 deletions payjoin-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ async fn main() -> Result<()> {
app.history().await?;
}
#[cfg(feature = "v2")]
Commands::Fallback { session_id } => {
app.fallback_sender(SessionId(*session_id)).await?;
Commands::Cancel { session_id, no_broadcast } => {
app.cancel_sender(SessionId(*session_id), *no_broadcast).await?;
}
};

Expand Down
35 changes: 15 additions & 20 deletions payjoin-cli/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ mod e2e {

#[cfg(feature = "v2")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sender_fallback_v2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn sender_cancel_v2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use payjoin_test_utils::{init_tracing, TestServices};
use tempfile::TempDir;

Expand All @@ -637,12 +637,12 @@ mod e2e {
let result = tokio::select! {
res = services.take_ohttp_relay_handle() => Err(format!("Ohttp relay is long running: {res:?}").into()),
res = services.take_directory_handle() => Err(format!("Directory server is long running: {res:?}").into()),
res = fallback_cli_async(&services, &temp_dir) => res,
res = cancel_cli_async(&services, &temp_dir) => res,
};

assert!(result.is_ok(), "sender_fallback_v2 failed: {:#?}", result.unwrap_err());
assert!(result.is_ok(), "sender_cancel_v2 failed: {:#?}", result.unwrap_err());

async fn fallback_cli_async(services: &TestServices, temp_dir: &TempDir) -> Result<()> {
async fn cancel_cli_async(services: &TestServices, temp_dir: &TempDir) -> Result<()> {
let sender_db_path = temp_dir.path().join("sender_db");
let (bitcoind, sender, _receiver) = init_bitcoind_sender_receiver(None, None)?;
let cert_path = &temp_dir.path().join("localhost.der");
Expand Down Expand Up @@ -684,7 +684,7 @@ mod e2e {
.expect("Failed to execute payjoin-cli receiver");
let bip21 = get_bip21_from_receiver(cli_receiver).await;

// Start sender and capture the session-id from the hint line, then interrupt
// Start sender and let it time out waiting for a response
let cli_sender = Command::new(payjoin_cli)
.arg("--root-certificate")
.arg(cert_path)
Expand All @@ -709,13 +709,9 @@ mod e2e {

// There is only one sender session in progress.
let session_id = 1i64;
// Ensure the fallback was not broadcast yet
let mempool_size =
sender.get_mempool_info().expect("should be able to get mempool").unbroadcast_count;
assert_eq!(mempool_size, 0, "fallback should not be in mempool");

// Run `payjoin-cli fallback <session-id>` and assert broadcast
let mut cli_fallback = Command::new(payjoin_cli)
// Run `payjoin-cli cancel <session-id>`: cancels and broadcasts the fallback tx
let mut cli_cancel = Command::new(payjoin_cli)
.arg("--root-certificate")
.arg(cert_path)
.arg("--rpchost")
Expand All @@ -726,32 +722,31 @@ mod e2e {
.arg(&sender_db_path)
.arg("--ohttp-relays")
.arg(ohttp_relay)
.arg("fallback")
.arg("cancel")
.arg(session_id.to_string())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("Failed to execute payjoin-cli fallback");
.expect("Failed to execute payjoin-cli cancel");

let mut fallback_stdout =
cli_fallback.stdout.take().expect("failed to take stdout of fallback");
let mut cancel_stdout =
cli_cancel.stdout.take().expect("failed to take stdout of cancel");
let timeout = tokio::time::Duration::from_secs(10);
let broadcast_line = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut fallback_stdout, |l| {
wait_for_stdout_match(&mut cancel_stdout, |l| {
l.contains("Broadcasted fallback transaction txid")
}),
)
.await?;

terminate(cli_fallback).await.expect("Failed to kill payjoin-cli fallback");
let subcommand_output = broadcast_line.expect("fallback should broadcast");
terminate(cli_cancel).await.expect("Failed to kill payjoin-cli cancel");
let subcommand_output = broadcast_line.expect("cancel should broadcast fallback tx");
let fallback_txid = subcommand_output.split_whitespace().nth(4).unwrap_or("");
let fallback_txid = Txid::from_str(fallback_txid).expect("valid txid");

assert!(
sender.get_raw_transaction(fallback_txid).is_ok(),
"fallback tx should be in the mempool"
"fallback tx should be in the mempool after cancel"
);

Ok(())
Expand Down
30 changes: 18 additions & 12 deletions payjoin-ffi/csharp/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,16 @@ public void SenderCancelFromWithReplyKey()
.BuildRecommended(1000)
.Save(senderPersister);
var cancelTransition = withReplyKey.Cancel();
var fallbackTx = cancelTransition.Save(senderPersister);
Assert.NotNull(fallbackTx);
Assert.NotEmpty(fallbackTx);
var pendingFallback = cancelTransition.Save(senderPersister);
Assert.NotNull(pendingFallback);
Assert.NotEmpty(pendingFallback.FallbackTx());

var result = PayjoinMethods.ReplaySenderEventLog(senderPersister);
var state = result.State();
Assert.IsType<SendSession.Closed>(state);
var cancelledResult = PayjoinMethods.ReplaySenderEventLog(senderPersister);
Assert.IsType<SendSession.PendingFallback>(cancelledResult.State());

pendingFallback.Close().Save(senderPersister);
var closedResult = PayjoinMethods.ReplaySenderEventLog(senderPersister);
Assert.IsType<SendSession.Closed>(closedResult.State());
}

[Fact]
Expand All @@ -238,13 +241,16 @@ public async Task SenderCancelFromWithReplyKeyAsync()
.BuildRecommended(1000)
.SaveAsync(senderPersister);
var cancelTransition = withReplyKey.Cancel();
var fallbackTx = await cancelTransition.SaveAsync(senderPersister);
Assert.NotNull(fallbackTx);
Assert.NotEmpty(fallbackTx);
var pendingFallback = await cancelTransition.SaveAsync(senderPersister);
Assert.NotNull(pendingFallback);
Assert.NotEmpty(pendingFallback.FallbackTx());

var result = await PayjoinMethods.ReplaySenderEventLogAsync(senderPersister);
var state = result.State();
Assert.IsType<SendSession.Closed>(state);
var cancelledResult = await PayjoinMethods.ReplaySenderEventLogAsync(senderPersister);
Assert.IsType<SendSession.PendingFallback>(cancelledResult.State());

await pendingFallback.Close().SaveAsync(senderPersister);
var closedResult = await PayjoinMethods.ReplaySenderEventLogAsync(senderPersister);
Assert.IsType<SendSession.Closed>(closedResult.State());
}
}

Expand Down
44 changes: 32 additions & 12 deletions payjoin-ffi/dart/test/test_payjoin_unit_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,25 @@ void main() {
.buildRecommended(minFeeRateSatPerKwu: 1000)
.save(persister: sender_persister);
var cancelTransition = withReplyKey.cancel();
var fallbackTx = cancelTransition.save(persister: sender_persister);
expect(fallbackTx, isNotNull);
expect(fallbackTx.length, greaterThan(0));
final result = payjoin.replaySenderEventLog(persister: sender_persister);
var pendingFallback = cancelTransition.save(persister: sender_persister);
expect(pendingFallback, isNotNull);
expect(pendingFallback!.fallbackTx().length, greaterThan(0));
final cancelledResult = payjoin.replaySenderEventLog(
persister: sender_persister,
);
expect(
result.state(),
cancelledResult.state(),
isA<payjoin.PendingFallbackSendSession>(),
reason: "sender should be in PendingFallback state after cancel",
);
pendingFallback.close().save(persister: sender_persister);
final closedResult = payjoin.replaySenderEventLog(
persister: sender_persister,
);
expect(
closedResult.state(),
isA<payjoin.ClosedSendSession>(),
reason: "sender should be in Closed state after cancel",
reason: "sender should be in Closed state after close",
);
});

Expand All @@ -215,18 +226,27 @@ void main() {
.buildRecommended(minFeeRateSatPerKwu: 1000)
.saveAsync(persister: sender_persister);
var cancelTransition = withReplyKey.cancel();
var fallbackTx = await cancelTransition.saveAsync(
var pendingFallback = await cancelTransition.saveAsync(
persister: sender_persister,
);
expect(fallbackTx, isNotNull);
expect(fallbackTx.length, greaterThan(0));
final result = await payjoin.replaySenderEventLogAsync(
expect(pendingFallback, isNotNull);
expect(pendingFallback!.fallbackTx().length, greaterThan(0));
final cancelledResult = await payjoin.replaySenderEventLogAsync(
persister: sender_persister,
);
expect(
result.state(),
cancelledResult.state(),
isA<payjoin.PendingFallbackSendSession>(),
reason: "sender should be in PendingFallback state after cancel",
);
await pendingFallback.close().saveAsync(persister: sender_persister);
final closedResult = await payjoin.replaySenderEventLogAsync(
persister: sender_persister,
);
expect(
closedResult.state(),
isA<payjoin.ClosedSendSession>(),
reason: "sender should be in Closed state after cancel",
reason: "sender should be in Closed state after close",
);
});
});
Expand Down
Loading
Loading