Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 52 additions & 34 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2753,8 +2753,8 @@ async fn prepare_send_msg(
chat_id.unarchive_if_not_muted(context, msg.state).await?;
}
chat.prepare_msg_raw(context, msg, update_msg_id).await?;

let row_ids = create_send_msg_jobs(context, msg)
let resend_to_new = false;
let row_ids = create_send_msg_jobs(context, msg, resend_to_new)
.await
.context("Failed to create send jobs")?;
if !row_ids.is_empty() {
Expand Down Expand Up @@ -2824,7 +2824,11 @@ async fn render_mime_message_and_pre_message(
/// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise.
///
/// The caller has to interrupt SMTP loop or otherwise process new rows.
pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result<Vec<i64>> {
async fn create_send_msg_jobs(
context: &Context,
msg: &mut Message,
resend_to_new: bool,
) -> Result<Vec<i64>> {
let cmd = msg.param.get_cmd();
if cmd == SystemMessage::GroupNameChanged || cmd == SystemMessage::GroupDescriptionChanged {
msg.chat_id
Expand Down Expand Up @@ -2940,35 +2944,41 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
msg.param.remove(Param::GuaranteeE2ee);
}
msg.subject.clone_from(&rendered_msg.subject);
// Sort the message to the bottom. Employ `msgs_index7` to compute `timestamp`.
context
.sql
.execute(
"
UPDATE msgs SET
timestamp=(
SELECT MAX(timestamp) FROM msgs INDEXED BY msgs_index7 WHERE
-- From `InFresh` to `OutMdnRcvd` inclusive except `OutDraft`.
state IN(10,13,16,18,20,24,26,28) AND
hidden IN(0,1) AND
chat_id=? AND
id<=?
),
pre_rfc724_mid=?, subject=?, param=?
WHERE id=?
",
(
msg.chat_id,
msg.id,
&msg.pre_rfc724_mid,
&msg.subject,
msg.param.to_string(),
msg.id,
),
)
.await?;
if !resend_to_new {
// Sort the message to the bottom. Employ `msgs_index7` to compute `timestamp`.
context
.sql
.execute(
"
UPDATE msgs SET
timestamp=(
SELECT MAX(timestamp) FROM msgs INDEXED BY msgs_index7 WHERE
-- From `InFresh` to `OutMdnRcvd` inclusive except `OutDraft`.
state IN(10,13,16,18,20,24,26,28) AND
hidden IN(0,1) AND
chat_id=? AND
id<=?
),
pre_rfc724_mid=?, subject=?, param=?
WHERE id=?
",
(
msg.chat_id,
msg.id,
&msg.pre_rfc724_mid,
&msg.subject,
msg.param.to_string(),
msg.id,
),
)
.await?;
}

let chunk_size = context.get_max_smtp_rcpt_to().await?;
let msg_id = match resend_to_new {
true => MsgId::new(u32::MAX),
false => msg.id,
};
let trans_fn = |t: &mut rusqlite::Transaction| {
let mut row_ids = Vec::<i64>::new();

Expand All @@ -2989,15 +2999,15 @@ WHERE id=?
&pre_msg.rfc724_mid,
&recipients_chunk,
&pre_msg.message,
msg.id,
msg_id,
))?;
row_ids.push(row_id.try_into()?);
}
let row_id = stmt.execute((
&rendered_msg.rfc724_mid,
&recipients_chunk,
&rendered_msg.message,
msg.id,
msg_id,
))?;
row_ids.push(row_id.try_into()?);
}
Expand Down Expand Up @@ -4609,7 +4619,11 @@ pub async fn forward_msgs_2ctx(
chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?;

curr_timestamp += 1;
if !create_send_msg_jobs(ctx_dst, &mut msg).await?.is_empty() {
let resend_to_new = false;
if !create_send_msg_jobs(ctx_dst, &mut msg, resend_to_new)
.await?
.is_empty()
{
ctx_dst.scheduler.interrupt_smtp().await;
}
created_msgs.push(msg.id);
Expand Down Expand Up @@ -4767,7 +4781,11 @@ pub(crate) async fn resend_msgs_ex(
if let Some(to_fingerprint) = &to_fingerprint {
msg.param.set(Param::Arg4, to_fingerprint.clone());
}
if create_send_msg_jobs(context, &mut msg).await?.is_empty() {
let resend_to_new = to_fingerprint.is_some();
if create_send_msg_jobs(context, &mut msg, resend_to_new)
.await?
.is_empty()
{
continue;
}

Expand Down
25 changes: 25 additions & 0 deletions src/chat/chat_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,31 @@ async fn test_broadcast_resend_to_new_member() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_broadcast_resend_failed_msg_to_new_member() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
let bob = &tcm.bob().await;
let fiona = &tcm.fiona().await;
let alice_bc_id = create_broadcast(alice, "bc".to_string()).await?;
let qr = get_securejoin_qr(alice, Some(alice_bc_id)).await.unwrap();

tcm.exec_securejoin_qr(bob, alice, &qr).await;
let alice_msg_id = alice.send_text(alice_bc_id, "text").await.sender_msg_id;
let mut msg = Message::load_from_db(alice, alice_msg_id).await?;
message::set_msg_failed(alice, &mut msg, "error").await?;
let fiona_bc_id = tcm.exec_securejoin_qr(fiona, alice, &qr).await;
let resent_msg = alice.pop_sent_msg().await;
let fiona_msg = fiona.recv_msg(&resent_msg).await;
assert_eq!(fiona_msg.chat_id, fiona_bc_id);
assert_eq!(fiona_msg.text, "text");
assert_eq!(
alice_msg_id.get_state(alice).await?,
MessageState::OutFailed
);
Ok(())
}

/// - Alice has multiple devices
/// - Alice creates a broadcast and sends a message into it
/// - Alice's second device sees the broadcast
Expand Down
Loading