diff options
| author | Mroik <mroik@delayed.space> | 2026-04-10 00:21:25 +0200 |
|---|---|---|
| committer | Mroik <mroik@delayed.space> | 2026-04-13 06:56:11 +0200 |
| commit | b336fd39444e8089d35a7a2bd4c0c3e8228c6c36 (patch) | |
| tree | 945f577f4c8b1d8c9e58b10d39e38832ec77391c /src | |
| parent | 0ff548961a68213487faa85b7b01cf717bb27268 (diff) | |
Add mail processing scaffolding
After receiving the email we don't want to process it in the same thread
as soon as we can, instead we queue it to a MailProcessor. This allows
us to be more flexible with the pipeline and reduce the amount of
concurrency for the database connection. This also helps with writing
possible fences around resource consumption.
Signed-off-by: Mroik <mroik@delayed.space>
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 1 | ||||
| -rw-r--r-- | src/model.rs | 7 | ||||
| -rw-r--r-- | src/process_mail.rs | 23 | ||||
| -rw-r--r-- | src/smtp_server.rs | 19 |
4 files changed, 46 insertions, 4 deletions
diff --git a/src/main.rs b/src/main.rs index fcce4d8..20ace78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod database; mod model; +mod process_mail; mod smtp_server; fn main() { diff --git a/src/model.rs b/src/model.rs index 7fcf80f..37a855f 100644 --- a/src/model.rs +++ b/src/model.rs @@ -346,6 +346,13 @@ impl SubscriptionQuery<'_> { } } +// TODO +pub struct Mail { + pub from: String, + pub recipient: Vec<String>, + pub data: String, +} + #[cfg(test)] mod tests { use anyhow::Result; diff --git a/src/process_mail.rs b/src/process_mail.rs new file mode 100644 index 0000000..5a22ce1 --- /dev/null +++ b/src/process_mail.rs @@ -0,0 +1,23 @@ +use anyhow::Result; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::model::Mail; + +struct MailProcessor { + rx: Receiver<Mail>, + tx: Sender<Mail>, +} + +// TODO: Store first then forward. On complete forward remove from db, otherwise save db with the +// emails to send to. +impl MailProcessor { + async fn run(&mut self) -> Result<()> { + // TODO: Check againts self.rx.is_closed() instead to stop the program before consuming all + // of the queue. Store the emails not yet processed and restore the queue upon startup. + while let Some(mail) = self.rx.recv().await { + todo!() + } + + Ok(()) + } +} diff --git a/src/smtp_server.rs b/src/smtp_server.rs index 2795ecf..bacf8eb 100644 --- a/src/smtp_server.rs +++ b/src/smtp_server.rs @@ -4,7 +4,9 @@ use std::{ }; use anyhow::Result; -use tokio::spawn; +use tokio::{spawn, sync::mpsc::Sender}; + +use crate::model::Mail; const SERVER_NAME: &str = ""; @@ -22,12 +24,13 @@ impl SmtpServer { } // TODO: trap SIGINT to stop server? - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self, tx_processor: Sender<Mail>) -> Result<()> { self.running = true; while self.running { let (stream, addr) = self.listener.accept()?; let session = SessionHandler { addr, + tx_processor: tx_processor.clone(), state: SessionState::default(), client_host: String::new(), from: None, @@ -42,6 +45,7 @@ impl SmtpServer { struct SessionHandler { addr: SocketAddr, + tx_processor: Sender<Mail>, state: SessionState, client_host: String, from: Option<String>, @@ -158,18 +162,25 @@ impl SessionHandler { Ok(Reply::StartMailInput) } - // TODO: Forward and store email async fn process_mail(&mut self) -> Result<Reply> { if self.from.is_none() || self.to.is_empty() || self.data.trim().is_empty() { return Ok(Reply::InvalidCommand); } + let mail = Mail { + from: self.from.as_ref().unwrap().clone(), + recipient: self.to.clone(), + data: self.data.clone(), + }; + + self.tx_processor.send(mail).await?; + self.state = SessionState::Normal; self.from = None; self.to.clear(); self.data.clear(); - todo!() + Ok(Reply::Completed(String::from("Ok"))) } } |
