Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions src/embassy_net_modem/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
// Licence: https://github.com/embassy-rs/embassy/blob/main/LICENSE-APACHE
// Source file: https://github.com/embassy-rs/embassy/blob/a8cb8a7fe1f594b765dee4cfc6ff3065842c7c6e/embassy-net-nrf91/src/context.rs

use core::cell::RefCell;
use core::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use core::str::FromStr;

use at_commands::builder::CommandBuilder;
use at_commands::parser::CommandParser;
use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex};
use embassy_time::{Duration, Timer};
use heapless::Vec;

Expand All @@ -21,7 +21,8 @@ const DNS_VEC_SIZE: usize = 2;
pub struct Control<'a> {
control: super::Control<'a>,
cid: u8,
lte_link: Mutex<CriticalSectionRawMutex, Option<LteLink>>,
lte_link: RefCell<Option<LteLink>>,
needs_reconnect: RefCell<bool>,
}

/// Authentication parameters for the Packet Data Network (PDN).
Expand Down Expand Up @@ -269,7 +270,8 @@ impl<'a> Control<'a> {
Self {
control,
cid,
lte_link: Mutex::new(None),
lte_link: RefCell::new(None),
needs_reconnect: RefCell::new(false),
}
}

Expand All @@ -287,7 +289,9 @@ impl<'a> Control<'a> {
pub async fn configure(&self, config: &PdConfig<'_>, pin: Option<&[u8]>) -> Result<(), Error> {
let mut cmd: [u8; 256] = [0; 256];

if let Some(link) = self.lte_link.lock().await.take() {
let link = self.lte_link.borrow_mut().take();

if let Some(link) = link {
link.deactivate().await?;
}

Expand Down Expand Up @@ -369,6 +373,11 @@ impl<'a> Control<'a> {
}

async fn attached(&self) -> Result<bool, Error> {
// We don't have a link, meaning the modem is disabled.
if self.lte_link.borrow().is_none() {
return Ok(false);
}

let mut cmd: [u8; 256] = [0; 256];

let op = CommandBuilder::create_query(&mut cmd, true)
Expand Down Expand Up @@ -541,17 +550,26 @@ impl<'a> Control<'a> {

/// Disable modem
pub async fn disable(&self) -> Result<(), Error> {
if let Some(link) = self.lte_link.lock().await.take() {
let link = self.lte_link.borrow_mut().take();
if let Some(link) = link {
link.deactivate().await?;
};

// Also close the current socket
self.control.close_raw_socket().await;

*self.needs_reconnect.borrow_mut() = true;
Ok(())
}

/// Enable modem
pub async fn enable(&self) -> Result<(), Error> {
let mut cmd: [u8; 256] = [0; 256];

self.lte_link.lock().await.replace(LteLink::new().await?);
if self.lte_link.borrow().is_none() {
let link = LteLink::new().await?;
self.lte_link.borrow_mut().replace(link);
}

// Make modem survive PDN detaches
let op = CommandBuilder::create_set(&mut cmd, true)
Expand All @@ -567,17 +585,19 @@ impl<'a> Control<'a> {
}

/// Run a control loop for this context, ensuring that reaattach is handled.
/// This also allows to reconnect after calling `Control::disable()` then `Control::enable()`.
pub async fn run<F: Fn(&Status)>(&self, reattach: F) -> Result<(), Error> {
self.enable().await?;
let status = self.wait_attached().await?;
self.control.open_raw_socket().await;
reattach(&status);

loop {
if !self.attached().await? {
if !self.attached().await? || *self.needs_reconnect.borrow() {
self.control.close_raw_socket().await;
let status = self.wait_attached().await?;
self.control.open_raw_socket().await;
*self.needs_reconnect.borrow_mut() = false;
reattach(&status);
}
Timer::after(Duration::from_secs(10)).await;
Expand Down
59 changes: 46 additions & 13 deletions src/embassy_net_modem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use core::cell::RefCell;
use core::mem::MaybeUninit;

use embassy_futures::select::{select, Either};
use embassy_net_driver_channel as ch;
use embassy_futures::select::{select3, Either3};
use embassy_net_driver_channel::{self as ch, driver::LinkState};
use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, signal::Signal};
use embassy_time::Timer;

pub mod context;
Expand All @@ -25,15 +26,17 @@ pub async fn new<'a>(state: &'a mut State) -> (NetDriver<'a>, Control<'a>, Runne
.inner
.write(RefCell::new(StateInner { net_socket: None }));

let control = Control { state: state_inner };
let control = Control {
state: state_inner,
close_signal: &state.close_signal,
};

let (ch_runner, device) = ch::new(&mut state.ch, ch::driver::HardwareAddress::Ip);
let state_ch = ch_runner.state_runner();
state_ch.set_link_state(ch::driver::LinkState::Up);

let runner = Runner {
ch: ch_runner,
state: state_inner,
close_signal: &state.close_signal,
};

(device, control, runner)
Expand All @@ -43,6 +46,7 @@ pub async fn new<'a>(state: &'a mut State) -> (NetDriver<'a>, Control<'a>, Runne
pub struct State {
ch: ch::State<MTU, 4, 4>,
inner: MaybeUninit<RefCell<StateInner>>,
close_signal: Signal<CriticalSectionRawMutex, bool>,
}

impl State {
Expand All @@ -51,6 +55,7 @@ impl State {
Self {
ch: ch::State::new(),
inner: MaybeUninit::uninit(),
close_signal: Signal::new(),
}
}
}
Expand All @@ -70,6 +75,7 @@ struct StateInner {
/// You can use this object to control the modem at runtime, such as running AT commands.
pub struct Control<'a> {
state: &'a RefCell<StateInner>,
close_signal: &'a Signal<CriticalSectionRawMutex, bool>,
}

pub(crate) const CAP_SIZE: usize = 256;
Expand All @@ -84,13 +90,20 @@ impl<'a> Control<'a> {
)
.await
.unwrap();

// Avoid closing the freshly created socket
self.close_signal.reset();
self.state.borrow_mut().net_socket = Some(socket);
}

async fn close_raw_socket(&self) {
let sock = self.state.borrow_mut().net_socket.take();
if let Some(s) = sock {
let socket = self.state.borrow_mut().net_socket.take();
// If the runner doesn't have the socket we deactivate it
if let Some(s) = socket {
s.deactivate().await.unwrap();
} else {
// If the runner has the socket we send it a signal to deactivate it
self.close_signal.signal(true);
}
}
/// Run an AT command.
Expand All @@ -107,22 +120,31 @@ impl<'a> Control<'a> {
pub struct Runner<'a> {
ch: ch::Runner<'a, MTU>,
state: &'a RefCell<StateInner>,
close_signal: &'a Signal<CriticalSectionRawMutex, bool>,
}

impl<'a> Runner<'a> {
/// Run the driver operation in the background.
///
/// You must run this in a background task, concurrently with all network operations.
pub async fn run(mut self) -> ! {
let mut previous_state = LinkState::Down;
loop {
let (_, mut rx_chan, mut tx_chan) = self.ch.borrow_split();
let (state_chan, mut rx_chan, mut tx_chan) = self.ch.borrow_split();
let net_socket = self.state.borrow_mut().net_socket.take();

let mut rx_buf = [0; 2048];

let token: crate::CancellationToken = Default::default();

if let Some(socket) = net_socket {
// Avoid acquiring the lock for every iteration
if previous_state == LinkState::Down {
// We have a socket, this means the link is up
state_chan.set_link_state(LinkState::Up);
previous_state = LinkState::Up;
}

let rx_fut = async {
let size = socket
.receive_with_cancellation(&mut rx_buf, &token)
Expand All @@ -132,15 +154,17 @@ impl<'a> Runner<'a> {
(size, buf)
};
let tx_fut = tx_chan.tx_buf();
match select(rx_fut, tx_fut).await {
Either::First((size, buf)) => {
match select3(rx_fut, tx_fut, self.close_signal.wait()).await {
Either3::First((size, buf)) => {
if size > 0 {
// Process received data
buf[..size].copy_from_slice(&rx_buf[..size]);
rx_chan.rx_done(size);
}
// Put the socket back
self.state.borrow_mut().net_socket.replace(socket);
}
Either::Second(buf) => {
Either3::Second(buf) => {
let size = buf.len();
let mut remaining = size;
while remaining > 0 {
Expand All @@ -151,11 +175,20 @@ impl<'a> Runner<'a> {
remaining -= size;
}
tx_chan.tx_done();
// Put the socket back
self.state.borrow_mut().net_socket.replace(socket);
}
Either3::Third(_) => {
// We need to close the socket
let _ = socket.deactivate().await;
// The socket has been consumed, self.state.net_socket is now None
}
}

self.state.borrow_mut().net_socket.replace(socket);
} else {
// We don't have a socket, link is down.
state_chan.set_link_state(LinkState::Down);
previous_state = LinkState::Down;

Timer::after_millis(100).await
}
}
Expand Down
Loading