From c6202c880b8d1b8e1676b998935cc337673a5d36 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 1 Apr 2026 16:32:59 -0400 Subject: [PATCH 1/2] billing: add `--dry-run` mode The invoice generator's trial-run workflow ran against a sandbox Stripe account, but produced inaccurate results because the sandbox lacked livemode invoice state (manual bills with existing open/paid invoices appeared as phantom creates). `--dry-run` runs against livemode Stripe in read-only mode, showing what would happen without creating invoices, customers, or modifying anything. Structural changes: * Split `upsert_invoice` into `classify` (read-only: validation, Stripe searches) and `execute` (writes: customer/invoice creation, line items, verification). `--dry-run` stops after classify. * Decompose `get_or_create_customer_for_tenant` into `find_customer` (read-only search) and `ensure_customer_for_invoicing` (find-or-create + email backfill). * Reorder classify checks so cheap local validations (FreeTier, FutureTrialStart, LessThanMinimum) run before any Stripe API calls. * Multi-month manual bills that already have an `open`, `paid`, `void`, or `uncollectible` invoice in Stripe are now classified as `AlreadyProcessed` instead of erroring. These are expected when date-range-overlapping manual bills were invoiced in a previous billing run. * Per-tenant summary output annotates manual bills with their date range (`[manual: 2026-01-01 - 2026-06-30]`). * Dry-run with `--clean-up` previews which stale draft invoices would be deleted. `--recreate-finalized` logs which invoices would be deleted and recreated. --- crates/billing-integrations/src/publish.rs | 701 +++++++++++++-------- 1 file changed, 455 insertions(+), 246 deletions(-) diff --git a/crates/billing-integrations/src/publish.rs b/crates/billing-integrations/src/publish.rs index f8c3b802c0d..6f888095526 100644 --- a/crates/billing-integrations/src/publish.rs +++ b/crates/billing-integrations/src/publish.rs @@ -59,6 +59,10 @@ pub struct PublishInvoice { /// Clean up dangling invoices that are not in the database #[clap(long, default_value_t = false)] pub clean_up: bool, + /// Run in read-only mode: classify all invoices and report what would + /// happen, without creating or modifying anything in Stripe. + #[clap(long, default_value_t = false)] + pub dry_run: bool, } fn parse_date(arg: &str) -> Result { @@ -102,20 +106,32 @@ enum InvoiceResult { FutureTrialStart, NoDataMoved, NoFullPipeline, + AlreadyProcessed, Error, } impl InvoiceResult { - pub fn message(&self) -> String { + pub fn message(&self, dry_run: bool) -> String { match self { InvoiceResult::Created(provider) => { + let verb = if dry_run { + "Would publish" + } else { + "Published" + }; if provider == &PaymentProvider::Stripe { - "Published new invoice".to_string() + format!("{verb} new invoice") + } else { + format!("{verb} new invoice for tenant using {provider:?} provider") + } + } + InvoiceResult::Updated => { + if dry_run { + "Would update existing invoice".to_string() } else { - format!("Published new invoice for tenant using {provider:?} provider") + "Updated existing invoice".to_string() } } - InvoiceResult::Updated => "Updated existing invoice".to_string(), InvoiceResult::LessThanMinimum => { "Skipping invoice for less than the minimum chargable amount ($0.50)".to_string() } @@ -129,11 +145,31 @@ impl InvoiceResult { InvoiceResult::NoFullPipeline => { "Skipping invoice for tenant without an active pipeline".to_string() } + InvoiceResult::AlreadyProcessed => { + "Skipping invoice already processed in a previous billing run".to_string() + } InvoiceResult::Error => "Error publishing invoices".to_string(), } } } +/// The outcome of the classify phase: what action should be taken for this invoice. +enum InvoiceAction { + /// Invoice should not be created. Carries the skip reason and the + /// customer (if found) for potential clean-up of stale drafts. + Skip { + result: InvoiceResult, + customer: Option, + }, + /// Create a new invoice. `replace` is set when --recreate-finalized + /// requires deleting an existing invoice first. + Create { replace: Option }, + /// Update an existing draft invoice's line items. + Update { + existing_invoice_id: stripe::InvoiceId, + }, +} + #[derive( Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Copy, sqlx::Type, )] @@ -195,92 +231,214 @@ impl Invoice { Ok(invoice_search.into_iter().next()) } - #[tracing::instrument(skip(self, client, db_client), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] - async fn upsert_invoice( + /// Read-only classification: determines what action should be taken for this + /// invoice without making any writes to Stripe. + #[tracing::instrument(skip(self, client), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] + async fn classify( &self, client: &stripe::Client, - db_client: &Pool, recreate_finalized: bool, - mode: ChargeType, - ) -> anyhow::Result { + ) -> anyhow::Result { + // --- Phase 1: Cheap local checks (no Stripe calls) --- + match (&self.invoice_type, &self.extra) { (InvoiceType::Preview, _) => { bail!("Should not create Stripe invoices for preview invoices") } - (InvoiceType::Final, Some(extra)) => { - // If we have a payment method, don't skip the invoice - // If `has_payment_method` is Some, then there is a stripe customer to check - let validated_has_payment_method = - if let Some(has_payment_method) = self.has_payment_method { - // The Stripe capture in the database has been known to be unreliable. - // Let's double-check with Stripe to make sure it agrees that we really - // do not have a payment method set. - let real_default_payment_method = get_or_create_customer_for_tenant( - client, - db_client, - self.billed_prefix.to_owned(), - false, // If there's no customer, there's no way there can be a payment method - ) - .await? - .and_then(|customer| customer.invoice_settings) - .and_then(|i| i.default_payment_method); - - if has_payment_method != real_default_payment_method.is_some() { - tracing::warn!( - ?has_payment_method, - stripe_payment_method = real_default_payment_method.is_some(), - "Inconsistent payment method state" - ); - } - - real_default_payment_method.is_some() - } else { - false - }; - - let unwrapped_extra = extra.clone().0.expect( - "This is just a sqlx quirk, if the outer Option is Some then this will be Some", - ); - - if !validated_has_payment_method { - if unwrapped_extra.processed_data_gb.unwrap_or_default() == 0.0 - && !matches!(&self.invoice_type, InvoiceType::Manual) - { - return Ok(InvoiceResult::NoDataMoved); - } - - if !self.has_full_pipeline && !matches!(&self.invoice_type, InvoiceType::Manual) - { - return Ok(InvoiceResult::NoFullPipeline); - } - } - } (InvoiceType::Final, None) => { bail!("Invoice should have extra") } _ => {} }; - // An invoice should be generated in Stripe if the tenant is on a paid plan, which means: - // * The tenant has a free trial start date - // * The tenant's free trial start date is before the invoice period's end date if let InvoiceType::Final = self.invoice_type { match self.tenant_trial_start { Some(trial_start) if self.date_end < trial_start => { - return Ok(InvoiceResult::FutureTrialStart); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::FutureTrialStart, + customer: None, + }); } None => { - return Ok(InvoiceResult::FreeTier); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::FreeTier, + customer: None, + }); } _ => {} } } - // The minimum chargable amount of USD in Stripe is $0.50. - // https://stripe.com/docs/currencies#minimum-and-maximum-charge-amounts if self.subtotal < 50 { - return Ok(InvoiceResult::LessThanMinimum); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::LessThanMinimum, + customer: None, + }); + } + + // --- Phase 2: Stripe calls (only for invoices that survived Phase 1) --- + + // For Final invoices, verify the payment method state with Stripe. + // The DB capture has been known to be unreliable, so Stripe is the + // source of truth. If the tenant has no payment method, skip on + // NoDataMoved / NoFullPipeline. + let mut found_customer: Option> = None; + + if let (InvoiceType::Final, Some(extra)) = (&self.invoice_type, &self.extra) { + let validated_has_payment_method = + if let Some(has_payment_method) = self.has_payment_method { + let customer = find_customer(client, &self.billed_prefix).await?; + let real_has_pm = customer + .as_ref() + .and_then(|c| c.invoice_settings.as_ref()) + .and_then(|i| i.default_payment_method.as_ref()) + .is_some(); + + if has_payment_method != real_has_pm { + tracing::warn!( + ?has_payment_method, + stripe_payment_method = real_has_pm, + "Inconsistent payment method state" + ); + } + + found_customer = Some(customer); + real_has_pm + } else { + false + }; + + if !validated_has_payment_method { + let unwrapped_extra = extra.clone().0.expect( + "This is just a sqlx quirk, if the outer Option is Some then this will be Some", + ); + + if unwrapped_extra.processed_data_gb.unwrap_or_default() == 0.0 { + return Ok(InvoiceAction::Skip { + result: InvoiceResult::NoDataMoved, + customer: found_customer.flatten(), + }); + } + + if !self.has_full_pipeline { + return Ok(InvoiceAction::Skip { + result: InvoiceResult::NoFullPipeline, + customer: found_customer.flatten(), + }); + } + } + } + + // Look up customer (reuse if already fetched during payment method validation) + let customer = match found_customer { + Some(c) => c, + None => find_customer(client, &self.billed_prefix).await?, + }; + + let customer = match customer { + Some(c) => c, + // No customer in Stripe means no existing invoice is possible + None => return Ok(InvoiceAction::Create { replace: None }), + }; + + let customer_id = customer.id.to_string(); + + // Search for an existing invoice in Stripe + if let Some(invoice) = self + .get_stripe_invoice(client, customer_id.as_str()) + .await? + { + match invoice.status { + Some(stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft) + if recreate_finalized => + { + Ok(InvoiceAction::Create { + replace: Some(invoice.id), + }) + } + Some(stripe::InvoiceStatus::Draft) => { + tracing::debug!( + "Found existing draft invoice {id}", + id = invoice.id.to_string() + ); + Ok(InvoiceAction::Update { + existing_invoice_id: invoice.id, + }) + } + Some(stripe::InvoiceStatus::Open) + if matches!(self.invoice_type, InvoiceType::Manual) => + { + tracing::debug!( + "Manual invoice {id} already open, skipping", + id = invoice.id.to_string() + ); + Ok(InvoiceAction::Skip { + result: InvoiceResult::AlreadyProcessed, + customer: Some(customer), + }) + } + Some(stripe::InvoiceStatus::Open) => { + bail!( + "Found open invoice {id}. Pass --recreate-finalized to delete and recreate this invoice.", + id = invoice.id.to_string() + ) + } + Some( + status @ (stripe::InvoiceStatus::Paid + | stripe::InvoiceStatus::Void + | stripe::InvoiceStatus::Uncollectible), + ) if matches!(self.invoice_type, InvoiceType::Manual) => { + tracing::debug!( + "Manual invoice {id} already in state {status}, skipping", + id = invoice.id.to_string(), + status = status + ); + Ok(InvoiceAction::Skip { + result: InvoiceResult::AlreadyProcessed, + customer: Some(customer), + }) + } + Some(status) => { + bail!( + "Found invoice {id} in unsupported state {status}, skipping.", + id = invoice.id.to_string(), + status = status + ); + } + None => { + bail!( + "Unexpected missing status from invoice {id}", + id = invoice.id.to_string() + ); + } + } + } else { + Ok(InvoiceAction::Create { replace: None }) } + } + + /// Execute the classified action: performs all Stripe writes (customer creation, + /// invoice creation/update, line item management, verification). + #[tracing::instrument(skip(self, client, db_client, action), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] + async fn execute( + &self, + client: &stripe::Client, + db_client: &Pool, + action: InvoiceAction, + mode: ChargeType, + ) -> anyhow::Result { + let (is_update, replace, existing_invoice_id) = match action { + InvoiceAction::Skip { result, .. } => return Ok(result), + InvoiceAction::Create { replace, .. } => (false, replace, None), + InvoiceAction::Update { + existing_invoice_id, + .. + } => (true, None, Some(existing_invoice_id)), + }; + + // Ensure customer exists and has an email (required for invoicing) + let customer = + ensure_customer_for_invoicing(client, db_client, &self.billed_prefix).await?; // Anything before 12:00:00 renders as the previous day in Stripe let date_start_secs = self @@ -312,111 +470,89 @@ impl Invoice { .as_str() .expect("InvoiceType is serializable"); - let customer = get_or_create_customer_for_tenant( - client, - db_client, - self.billed_prefix.to_owned(), - true, - ) - .await? - .expect("Should never return None"); - let customer_id = customer.id.to_string(); - - let maybe_invoice = if let Some(invoice) = self - .get_stripe_invoice(&client, customer_id.as_str()) - .await? - { - match invoice.status { - Some(state @ (stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft)) - if recreate_finalized => - { + // Delete existing invoice if --recreate-finalized was used + if let Some(ref replace_id) = replace { + // Re-verify the invoice status before deleting (guard against race conditions) + let existing = stripe::Invoice::retrieve(client, replace_id, &[]).await?; + match existing.status { + Some(state @ (stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft)) => { tracing::warn!( - "Found invoice {id} in state {state} deleting and recreating", - id = invoice.id.to_string(), + "Found invoice {id} in state {state}, deleting and recreating", + id = replace_id.to_string(), state = state ); - stripe::Invoice::delete(client, &invoice.id).await?; - None - } - Some(stripe::InvoiceStatus::Draft) => { - tracing::debug!( - "Updating existing invoice {id}", - id = invoice.id.to_string() - ); - Some(invoice) - } - Some(stripe::InvoiceStatus::Open) => { - bail!( - "Found open invoice {id}. Pass --recreate-finalized to delete and recreate this invoice.", - id = invoice.id.to_string() - ) + stripe::Invoice::delete(client, replace_id).await?; } Some(status) => { bail!( - "Found invoice {id} in unsupported state {status}, skipping.", - id = invoice.id.to_string(), + "Invoice {id} changed to state {status} since classification, cannot delete.", + id = replace_id.to_string(), status = status ); } None => { bail!( "Unexpected missing status from invoice {id}", - id = invoice.id.to_string() + id = replace_id.to_string() ); } } - } else { - None - }; + } - let invoice = match maybe_invoice.clone() { - Some(inv) => inv, - None => { - let invoice = stripe::Invoice::create( - client, - stripe::CreateInvoice { - customer: Some(customer.id.to_owned()), - // Stripe timestamps are measured in _seconds_ since epoch - // Due date must be in the future. Bill net-30, so 30 days from today - due_date: match mode { - ChargeType::SendInvoice => Some((Utc::now() + Duration::days(30)).timestamp()), - ChargeType::AutoCharge => None + // Create or reuse the invoice + let invoice = if let Some(existing_id) = existing_invoice_id { + tracing::debug!( + "Updating existing invoice {id}", + id = existing_id.to_string() + ); + stripe::Invoice::retrieve(client, &existing_id, &[]).await? + } else { + let description_text = format!( + "Your Flow bill for the billing period between {date_start_human} - {date_end_human}. Tenant: {tenant}", + tenant = self.billed_prefix + ); + let invoice = stripe::Invoice::create( + client, + stripe::CreateInvoice { + customer: Some(customer.id.to_owned()), + due_date: match mode { + ChargeType::SendInvoice => { + Some((Utc::now() + Duration::days(30)).timestamp()) + } + ChargeType::AutoCharge => None, + }, + description: Some(description_text.as_str()), + collection_method: Some(match mode { + ChargeType::AutoCharge => stripe::CollectionMethod::ChargeAutomatically, + ChargeType::SendInvoice => stripe::CollectionMethod::SendInvoice, + }), + auto_advance: Some(false), + custom_fields: Some(vec![ + stripe::CreateInvoiceCustomFields { + name: "Billing Period Start".to_string(), + value: date_start_human.to_owned(), }, - description: Some( - format!( - "Your Flow bill for the billing period between {date_start_human} - {date_end_human}. Tenant: {tenant}", - tenant=self.billed_prefix.to_owned() - ) - .as_str(), + stripe::CreateInvoiceCustomFields { + name: "Billing Period End".to_string(), + value: date_end_human.to_owned(), + }, + ]), + metadata: Some(HashMap::from([ + ( + TENANT_METADATA_KEY.to_string(), + self.billed_prefix.to_owned(), ), - collection_method: Some(match mode { - ChargeType::AutoCharge => stripe::CollectionMethod::ChargeAutomatically, - ChargeType::SendInvoice => stripe::CollectionMethod::SendInvoice, - }), - auto_advance: Some(false), - custom_fields: Some(vec![ - stripe::CreateInvoiceCustomFields { - name: "Billing Period Start".to_string(), - value: date_start_human.to_owned(), - }, - stripe::CreateInvoiceCustomFields { - name: "Billing Period End".to_string(), - value: date_end_human.to_owned(), - }, - ]), - metadata: Some(HashMap::from([ - (TENANT_METADATA_KEY.to_string(), self.billed_prefix.to_owned()), - (INVOICE_TYPE_KEY.to_string(), invoice_type_str.to_owned()), - (BILLING_PERIOD_START_KEY.to_string(), date_start_repr), - (BILLING_PERIOD_END_KEY.to_string(), date_end_repr) - ])), - ..Default::default() - }, - ) - .await.context("Creating a new invoice")?; - tracing::debug!("Created a new invoice {id}", id = invoice.id); - invoice - } + (INVOICE_TYPE_KEY.to_string(), invoice_type_str.to_owned()), + (BILLING_PERIOD_START_KEY.to_string(), date_start_repr), + (BILLING_PERIOD_END_KEY.to_string(), date_end_repr), + ])), + ..Default::default() + }, + ) + .await + .context("Creating a new invoice")?; + tracing::debug!("Created a new invoice {id}", id = invoice.id); + invoice }; // Clear out line items from invoice, if there are any @@ -469,11 +605,10 @@ impl Invoice { ); } - // Let's double-check that the invoice total matches the desired total + // Re-fetch invoice and customer for fresh data (balance may have changed) let check_invoice = stripe::Invoice::retrieve(client, &invoice.id, &[]).await?; - - // Customers can have an invoice credit balance, so let's make sure we take that into account. - let credit_balance = customer.balance.unwrap_or(0); + let fresh_customer = stripe::Customer::retrieve(client, &customer.id, &[]).await?; + let credit_balance = fresh_customer.balance.unwrap_or(0); let expected = (self.subtotal + (diff.ceil() as i64) + credit_balance).max(0); @@ -485,10 +620,10 @@ impl Invoice { ) } - if maybe_invoice.is_some() { - return Ok(InvoiceResult::Updated); + if is_update { + Ok(InvoiceResult::Updated) } else { - return Ok(InvoiceResult::Created(self.payment_provider)); + Ok(InvoiceResult::Created(self.payment_provider)) } } } @@ -623,76 +758,85 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { .or_default() += 1; }); - tracing::info!( - "Processing {usage} usage-based invoices, and {manual} manually-entered invoices.", - usage = invoice_type_counter - .remove(&InvoiceType::Final) - .unwrap_or_default(), - manual = invoice_type_counter - .remove(&InvoiceType::Manual) - .unwrap_or_default(), - ); + if cmd.dry_run { + tracing::info!( + "[DRY RUN] Classifying {usage} usage-based invoices and {manual} manually-entered invoices without making any changes to Stripe.", + usage = invoice_type_counter + .remove(&InvoiceType::Final) + .unwrap_or_default(), + manual = invoice_type_counter + .remove(&InvoiceType::Manual) + .unwrap_or_default(), + ); + } else { + tracing::info!( + "Processing {usage} usage-based invoices, and {manual} manually-entered invoices.", + usage = invoice_type_counter + .remove(&InvoiceType::Final) + .unwrap_or_default(), + manual = invoice_type_counter + .remove(&InvoiceType::Manual) + .unwrap_or_default(), + ); + } let invoice_futures: Vec<_> = invoices .iter() .map(|response| { let client = stripe_client.clone(); let db_pool = db_pool.clone(); + + let annotation = match response.invoice_type { + InvoiceType::Manual => Some(format!( + "[manual: {} - {}]", + response.date_start.format("%Y-%m-%d"), + response.date_end.format("%Y-%m-%d") + )), + _ => None, + }; + async move { - let res = response - .upsert_invoice( - &client, - &db_pool, - cmd.recreate_finalized, - cmd.charge_type, - ) + let action = response + .classify(&client, cmd.recreate_finalized) .await; - match res { + + match action { Err(err) => { let formatted = format!( - "Error publishing {invoice_type:?} invoice for {tenant}", + "Error classifying {invoice_type:?} invoice for {tenant}", tenant = response.billed_prefix, invoice_type = response.invoice_type ); - Err(anyhow::anyhow!(format!( - "{}: {err:#}", - formatted, - err = err - ))) + Err(anyhow::anyhow!("{formatted}: {err:#}")) } - Ok(res) => { + Ok(InvoiceAction::Skip { result, customer }) => { tracing::debug!( tenant = response.billed_prefix, invoice_type = format!("{:?}", response.invoice_type), subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), "{}", - res.message() + result.message(cmd.dry_run) ); - match res { - InvoiceResult::Created(_) - | InvoiceResult::Updated - | InvoiceResult::Error => {} - // Remove any incorrectly created invoices that are now skipped for whatever reason - _ if cmd.clean_up => { - let task_res: Result<(), anyhow::Error> = async move { - let customer = match get_or_create_customer_for_tenant( - &client, - &db_pool, - response.billed_prefix.to_owned(), - false, - ) - .await? - { - Some(c) => c, - None => return Ok(()), - }; - - let customer_id = customer.id.to_string(); - - if let Some(invoice) = - response.get_stripe_invoice(&client, &customer_id).await? - { - if let Some(InvoiceStatus::Draft) = invoice.status { + + if cmd.clean_up { + let task_res: Result<(), anyhow::Error> = async { + let customer = match customer { + Some(c) => c, + None => return Ok(()), + }; + let customer_id = customer.id.to_string(); + + if let Some(invoice) = + response.get_stripe_invoice(&client, &customer_id).await? + { + if let Some(InvoiceStatus::Draft) = invoice.status { + if cmd.dry_run { + tracing::warn!( + tenant = response.billed_prefix.to_string(), + "[dry-run] Would delete stale draft invoice {}", + invoice.id + ); + } else { tracing::warn!( tenant = response.billed_prefix.to_string(), "Deleting draft invoice!" @@ -700,18 +844,67 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { stripe::Invoice::delete(&client, &invoice.id).await?; } } - - Ok(()) } - .await; + Ok(()) + } + .await; - if let Err(e) = task_res { - tracing::warn!("Failed to check for or clear potential leaked draft invoices for {}, this is probably not a problem: {e:#}", response.billed_prefix.to_owned()); - } - }, - _ => {} + if let Err(e) = task_res { + tracing::warn!("Failed to check for or clear potential leaked draft invoices for {}, this is probably not a problem: {e:#}", response.billed_prefix.to_owned()); + } + } + + Ok((result, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } + Ok(action) if cmd.dry_run => { + let result = match &action { + InvoiceAction::Create { replace: Some(id), .. } => { + tracing::info!( + tenant = response.billed_prefix, + "[dry-run] Would delete existing invoice {} and recreate", + id + ); + InvoiceResult::Created(response.payment_provider) + } + InvoiceAction::Create { .. } => { + InvoiceResult::Created(response.payment_provider) + } + InvoiceAction::Update { .. } => InvoiceResult::Updated, + InvoiceAction::Skip { .. } => unreachable!(), + }; + tracing::debug!( + tenant = response.billed_prefix, + invoice_type = format!("{:?}", response.invoice_type), + subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), + "[dry-run] {}", + result.message(cmd.dry_run) + ); + Ok((result, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } + Ok(action) => { + let res = response + .execute(&client, &db_pool, action, cmd.charge_type) + .await; + match res { + Err(err) => { + let formatted = format!( + "Error publishing {invoice_type:?} invoice for {tenant}", + tenant = response.billed_prefix, + invoice_type = response.invoice_type + ); + Err(anyhow::anyhow!("{formatted}: {err:#}")) + } + Ok(res) => { + tracing::debug!( + tenant = response.billed_prefix, + invoice_type = format!("{:?}", response.invoice_type), + subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), + "{}", + res.message(cmd.dry_run) + ); + Ok((res, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } } - Ok((res, response.subtotal, response.billed_prefix.to_owned())) } } } @@ -722,22 +915,22 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { let total = invoice_futures.len(); - let collected: HashMap)> = + let collected: HashMap)>)> = futures::stream::iter(invoice_futures) .buffer_unordered(cmd.concurrency) .or_else(|(err, invoice)| async move { if !cmd.fail_fast { tracing::error!("[{}]: {err:#}", invoice.billed_prefix); - Ok((InvoiceResult::Error, 0, invoice.billed_prefix)) + Ok((InvoiceResult::Error, 0, invoice.billed_prefix, None)) } else { Err(err) } }) .try_fold( HashMap::new(), - |mut map, (res, subtotal, tenant)| async move { + |mut map, (res, subtotal, tenant, annotation)| async move { let overall_count = map.values().map(|(_, count, _)| *count).sum::() + 1; - let msg = res.message(); + let msg = res.message(cmd.dry_run); let (subtotal_sum, count_for_result_type, tenants) = map.entry(res).or_insert((0, 0, vec![])); @@ -745,7 +938,7 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { *count_for_result_type += 1; tracing::info!("[{overall_count}/{total}, {tenant}]: {msg}"); - tenants.push((tenant, subtotal)); + tenants.push((tenant, subtotal, annotation)); Ok(map) }, ) @@ -755,7 +948,7 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { tracing::info!( "[{:4} invoices]: {:70}${:.2}", count, - status.message(), + status.message(cmd.dry_run), *subtotal_agg as f64 / 100.0 ); let limit = match status { @@ -763,18 +956,24 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { InvoiceResult::NoDataMoved | InvoiceResult::NoFullPipeline | InvoiceResult::LessThanMinimum - | InvoiceResult::FreeTier => 0, + | InvoiceResult::FreeTier + | InvoiceResult::AlreadyProcessed => 0, _ => 10, }; let sorted_tenants = tenants .iter() - .sorted_by(|(_, a), (_, b)| b.cmp(a)) + .sorted_by(|(_, a, _), (_, b, _)| b.cmp(a)) .collect_vec(); let (displayed_tenants, remainder_tenants) = sorted_tenants.split_at(limit.min(tenants.len())); - for (tenant, subtotal) in displayed_tenants { - tracing::info!(" - {:} ${:.2}", tenant, *subtotal as f64 / 100.0); + for (tenant, subtotal, annotation) in displayed_tenants { + match annotation { + Some(note) => { + tracing::info!(" - {} ${:.2} {}", tenant, *subtotal as f64 / 100.0, note) + } + None => tracing::info!(" - {} ${:.2}", tenant, *subtotal as f64 / 100.0), + } } if limit > 0 && remainder_tenants.len() > 0 { tracing::info!(" - ... {} Others", remainder_tenants.len(),); @@ -784,12 +983,11 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { Ok(()) } -#[tracing::instrument(skip(client, db_client))] -async fn get_or_create_customer_for_tenant( +/// Read-only: search Stripe for an existing customer by tenant metadata. +#[tracing::instrument(skip(client))] +async fn find_customer( client: &stripe::Client, - db_client: &Pool, - tenant: String, - create: bool, + tenant: &str, ) -> anyhow::Result> { let customers = stripe_search::( client, @@ -802,18 +1000,33 @@ async fn get_or_create_customer_for_tenant( .await .context(format!("Searching for tenant {tenant}"))?; - let customer = if let Some(customer) = customers.into_iter().next() { + if let Some(customer) = customers.into_iter().next() { tracing::debug!("Found existing customer {id}", id = customer.id.to_string()); + Ok(Some(customer)) + } else { + Ok(None) + } +} + +/// Ensures a Stripe customer exists for this tenant and is ready for invoicing. +/// Finds an existing customer or creates a new one, then ensures the customer +/// has an email set (looking up the earliest admin on the tenant if needed). +#[tracing::instrument(skip(client, db_client))] +async fn ensure_customer_for_invoicing( + client: &stripe::Client, + db_client: &Pool, + tenant: &str, +) -> anyhow::Result { + let customer = if let Some(customer) = find_customer(client, tenant).await? { customer - } else if create { + } else { tracing::debug!("Creating new customer"); - let new_customer = stripe::Customer::create( + let description = format!("Represents the billing entity for Flow tenant '{tenant}'"); + stripe::Customer::create( client, stripe::CreateCustomer { - name: Some(tenant.as_str()), - description: Some( - format!("Represents the billing entity for Flow tenant '{tenant}'").as_str(), - ), + name: Some(tenant), + description: Some(description.as_str()), metadata: Some(HashMap::from([ (TENANT_METADATA_KEY.to_string(), tenant.to_string()), ( @@ -824,11 +1037,7 @@ async fn get_or_create_customer_for_tenant( ..Default::default() }, ) - .await?; - - new_customer - } else { - return Ok(None); + .await? }; if customer.email.is_none() { @@ -870,5 +1079,5 @@ async fn get_or_create_customer_for_tenant( ); } } - Ok(Some(customer)) + Ok(customer) } From c4f5be367ef17bfe943faed8f876b61b5f16a3cc Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 10 Apr 2026 17:53:41 -0400 Subject: [PATCH 2/2] billing: always send manual invoices rather than auto-charging Customers' stored payment methods are for monthly usage overages. Manual bills (contracts, one-off charges, etc.) should be sent as invoices so the customer can decide how to pay, rather than being automatically charged to their payment method. * Override `charge_type` to `SendInvoice` for manual invoices during creation in `publish` * Switch manual invoices from `charge_automatically` to `send_invoice` during the send phase, even if the customer has a payment method on file --- crates/billing-integrations/src/publish.rs | 8 ++++++++ crates/billing-integrations/src/send.rs | 6 ++++-- crates/billing-integrations/src/stripe_utils.rs | 12 +++++++++++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/crates/billing-integrations/src/publish.rs b/crates/billing-integrations/src/publish.rs index 6f888095526..a1e73fa42f3 100644 --- a/crates/billing-integrations/src/publish.rs +++ b/crates/billing-integrations/src/publish.rs @@ -500,6 +500,14 @@ impl Invoice { } // Create or reuse the invoice + // Manual invoices should always be sent as invoices rather than + // charged to the customer's payment method. + let mode = if self.invoice_type == InvoiceType::Manual { + ChargeType::SendInvoice + } else { + mode + }; + let invoice = if let Some(existing_id) = existing_invoice_id { tracing::debug!( "Updating existing invoice {id}", diff --git a/crates/billing-integrations/src/send.rs b/crates/billing-integrations/src/send.rs index 9316a44e1cc..628d6072e7c 100644 --- a/crates/billing-integrations/src/send.rs +++ b/crates/billing-integrations/src/send.rs @@ -167,13 +167,15 @@ async fn update_draft_collection_methods( stripe_client: &Client, mut to_update: Vec, ) -> anyhow::Result> { - // Identify invoices that are `charge_automatically` but don't have a default payment method + // Identify invoices that need to be switched to `send_invoice`: + // - Manual invoices should always be sent as invoices, never auto-charged + // - Auto-charge invoices without a payment method on file must be sent as invoices let needs_update: HashSet = to_update .iter() .filter(|inv| { inv.collection_method().map_or(false, |cm| { cm == stripe::CollectionMethod::ChargeAutomatically - }) && !inv.has_cc() + }) && (inv.is_manual() || !inv.has_cc()) }) .map(|inv| inv.id().clone()) .collect::>(); diff --git a/crates/billing-integrations/src/stripe_utils.rs b/crates/billing-integrations/src/stripe_utils.rs index 36a719affbc..dbe0c1b09bc 100644 --- a/crates/billing-integrations/src/stripe_utils.rs +++ b/crates/billing-integrations/src/stripe_utils.rs @@ -1,4 +1,6 @@ -use crate::publish::{BILLING_PERIOD_END_KEY, BILLING_PERIOD_START_KEY, TENANT_METADATA_KEY}; +use crate::publish::{ + BILLING_PERIOD_END_KEY, BILLING_PERIOD_START_KEY, INVOICE_TYPE_KEY, TENANT_METADATA_KEY, +}; use num_format::{Locale, ToFormattedString}; use serde::{Serialize, de::DeserializeOwned}; use std::ops::{Deref, DerefMut}; @@ -122,6 +124,14 @@ impl Invoice { self.0.status.clone() } + pub fn is_manual(&self) -> bool { + self.0 + .metadata + .as_ref() + .and_then(|m| m.get(INVOICE_TYPE_KEY)) + .map_or(false, |v| v == "manual") + } + pub fn period_start(&self) -> Option { self.0 .metadata