use std::{ fmt::Display, net::{IpAddr, SocketAddr}, }; use anyhow::Result; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, spawn, sync::mpsc::Sender, }; use crate::model::Mail; pub struct SmtpServer { listener: TcpListener, running: bool, server_name: String, } impl SmtpServer { pub async fn new(ip: [u8; 4], port: u16, server_name: &str) -> Result { Ok(Self { listener: TcpListener::bind((IpAddr::from(ip), port)).await?, running: false, server_name: String::from(server_name), }) } // TODO: trap SIGINT to stop server? pub async fn run(&mut self, tx_processor: Sender) -> Result<()> { self.running = true; while self.running { let (stream, addr) = match self.listener.accept().await { Ok(v) => v, Err(e) => match e.raw_os_error() { Some(libc::EMFILE | libc::ENFILE) => return Err(anyhow::Error::from(e)), Some(libc::ECONNABORTED) => { log::error!("{}", e); continue; } Some(libc::EAGAIN) => continue, _ => { log::warn!("{}", e); continue; } }, }; log::info!("Connected: {}", addr); let session = SessionHandler { addr, server_name: self.server_name.clone(), tx_processor: tx_processor.clone(), state: SessionState::default(), client_host: String::new(), from: None, to: Vec::new(), data: String::new(), }; spawn(session.run(stream)); } Ok(()) } } struct SessionHandler { addr: SocketAddr, server_name: String, tx_processor: Sender, state: SessionState, client_host: String, from: Option, to: Vec, data: String, } impl SessionHandler { async fn run(mut self, mut stream: TcpStream) -> Result<()> { let (r, mut writer) = stream.split(); let mut reader = BufReader::new(r); let mut buffer = String::new(); writer .write_all(Reply::Ready(&self.server_name).to_string().as_bytes()) .await?; loop { buffer.clear(); // Technically emails could use any arbitrary encoding, not necessarily ASCII or UTF-8. // If such encoding is ever encountered it would error on this line. // // For now we won't bother to check as it is unlikely, but it should be handled before // deploying in production. // // TODO: Handle non UTF-8 encoding. if reader.read_line(&mut buffer).await? == 0 { log::warn!("Connection closed by {}", self.addr); break; } log::debug!("Received '{}' from '{}'", buffer.trim(), self.addr); // In this state the server is not expecting commands as the input is part of the email // message if self.state == SessionState::AwaitingMailInput { if buffer.starts_with(".\r\n") { self.process_mail().await?; writer .write_all(Reply::Completed("Ok").to_string().as_bytes()) .await?; continue; } self.data.push_str(&buffer); continue; } let command = match Command::try_from(buffer.as_str()) { Err(_) => { writer .write_all(Reply::InvalidCommand.to_string().as_bytes()) .await?; continue; } Ok(v) => v, }; log::debug!("COMMAND: {}", command); let res = self.apply(command).await?; log::debug!("RESPONSE: {}", res); writer.write_all(res.to_string().as_bytes()).await?; if res == Reply::EndTransmission { break; } } // Should wait for the client before closing but meh. Any transaction has already gone // through anyway so... waiting for the client to close the connection as part of the // protocol was a dumb choice, the client can't issue new commands anyway. stream.shutdown().await?; Ok(()) } async fn apply(&mut self, command: Command<'_>) -> Result> { match &command { Command::HELO(hostname) => self.helo(hostname).await, Command::MAIL(from) => self.mail(from).await, Command::RCPT(to) => self.rcpt(to).await, Command::DATA => self.start_data().await, Command::QUIT => self.quit().await, } } /// HELO resets buffers async fn helo(&mut self, hostname: &str) -> Result> { self.client_host = String::from(hostname); self.state = SessionState::Normal; self.from = None; self.to.clear(); Ok(Reply::Completed(&self.server_name)) } /// TODO: Validate email address /// MAIL resets buffers async fn mail(&mut self, from: &str) -> Result> { if from.is_empty() { return Ok(Reply::InvalidParameter); } self.from = Some(String::from(from)); self.to.clear(); self.state = SessionState::MailTransaction; Ok(Reply::Completed("Ok")) } /// TODO: Validate email addresses /// Only after having started a mail transaction async fn rcpt(&mut self, to: &str) -> Result> { if self.state != SessionState::MailTransaction { return Ok(Reply::BadSequence); } if to.is_empty() { return Ok(Reply::InvalidParameter); } self.to.push(String::from(to)); Ok(Reply::Completed("Ok")) } /// Only after having started a mail transaction async fn start_data(&mut self) -> Result> { if self.state != SessionState::MailTransaction { return Ok(Reply::BadSequence); } // Should I return a more meaningful string with this error code? if self.from.is_none() || self.to.is_empty() { return Ok(Reply::InvalidCommand); } self.state = SessionState::AwaitingMailInput; Ok(Reply::StartMailInput) } async fn process_mail(&mut self) -> Result> { if self.from.is_none() || self.to.is_empty() || self.data.trim().is_empty() { return Ok(Reply::InvalidCommand); } let mail = Mail { from: self.from.as_ref().unwrap().clone(), recipient: self.to.clone(), data: self.data.clone(), }; self.tx_processor.send(mail).await?; self.state = SessionState::Normal; self.from = None; self.to.clear(); self.data.clear(); Ok(Reply::Completed("Ok")) } async fn quit(&self) -> Result> { Ok(Reply::EndTransmission) } } #[derive(PartialEq, Default)] enum SessionState { #[default] WaitingHelo, MailTransaction, AwaitingMailInput, Normal, } #[derive(PartialEq)] enum Reply<'a> { Ready(&'a str), Completed(&'a str), StartMailInput, InvalidCommand, InvalidParameter, BadSequence, EndTransmission, } impl std::fmt::Display for Reply<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Reply::Ready(hostname) => write!(f, "220 {}\r\n", hostname), Reply::Completed(hostname) => write!(f, "250 {}\r\n", hostname), Reply::InvalidCommand => write!(f, "500 Invalid command\r\n"), Reply::InvalidParameter => write!(f, "501 Parameter error\r\n"), Reply::BadSequence => write!(f, "503 Bad sequence of commands\r\n"), Reply::StartMailInput => write!(f, "354 .\r\n"), Reply::EndTransmission => write!(f, "221 OK\r\n"), } } } enum Command<'a> { HELO(&'a str), MAIL(&'a str), RCPT(&'a str), DATA, QUIT, } impl<'a> TryFrom<&'a str> for Command<'a> { type Error = anyhow::Error; fn try_from(value: &'a str) -> std::result::Result { if value.len() >= 5 && value.to_lowercase().starts_with("helo ") { return Ok(Command::HELO(&value[5..])); } if value.len() >= 11 && value.to_lowercase().starts_with("mail from:<") && value.contains('>') { let from = &value[11..value.find('>').unwrap()]; return Ok(Command::MAIL(from)); } if value.len() >= 9 && value.to_lowercase().starts_with("rcpt to:<") && value.contains('>') { let to = &value[9..value.find('>').unwrap()]; return Ok(Command::RCPT(to)); } if value.to_lowercase().starts_with("data") { return Ok(Command::DATA); } if value.to_lowercase().starts_with("quit") { return Ok(Command::QUIT); } Err(anyhow::format_err!("Invalid command")) } } impl Display for Command<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Command::HELO(host) => write!(f, "HELO {}\r\n", host), Command::MAIL(from) => write!(f, "MAIL FROM:<{}>\r\n", from), Command::RCPT(to) => write!(f, "RCPT TO:<{}>\r\n", to), Command::DATA => write!(f, "DATA\r\n"), Command::QUIT => write!(f, "QUIT\r\n"), } } } mod tests { use std::net::SocketAddr; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpSocket, spawn, sync::mpsc::channel, }; use crate::smtp_server::{Command, SmtpServer}; const DOMAIN: &str = "mail.example.local"; const CLIENT_HOST: &str = "exploit"; #[tokio::test] async fn receive_email() { env_logger::init(); let mut server = SmtpServer::new([127, 0, 0, 1], 9090, DOMAIN).await.unwrap(); let (tx, mut rx) = channel(5); let j = spawn(async move { server.run(tx).await.unwrap(); log::info!("IM DONE"); }); let sock = TcpSocket::new_v4().unwrap(); let mut s = sock .connect(SocketAddr::new([127, 0, 0, 1].into(), 9090)) .await .unwrap(); let (reader, mut writer) = s.split(); let mut reader = BufReader::new(reader); let mut buf = String::new(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), format!("220 {}", DOMAIN)); writer .write_all(Command::HELO(CLIENT_HOST).to_string().as_bytes()) .await .unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), format!("250 {}", DOMAIN)); writer .write_all(Command::MAIL("mroik@poul.org").to_string().as_bytes()) .await .unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), "250 Ok"); writer .write_all(Command::RCPT("mroik@delayed.space").to_string().as_bytes()) .await .unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), "250 Ok"); writer .write_all(Command::DATA.to_string().as_bytes()) .await .unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), "354 ."); writer.write_all(b"From: mroik@poul.org\r\n").await.unwrap(); writer .write_all(b"To: mroik@delayed.space\r\n") .await .unwrap(); writer .write_all(b"Date: Tue, 14 Apr 2026 23:00:00 +0200\r\n") .await .unwrap(); writer .write_all(b"Subject: This is a test\r\n") .await .unwrap(); writer.write_all(b"\r\n").await.unwrap(); writer .write_all(b"Hey, this is a test. You can ignore this!\r\n") .await .unwrap(); writer.write_all(b"\r\n").await.unwrap(); writer.write_all(b"Mroik\r\n").await.unwrap(); writer.write_all(b".\r\n").await.unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), "250 Ok"); let lines = [ "From: mroik@poul.org", "To: mroik@delayed.space", "Date: Tue, 14 Apr 2026 23:00:00 +0200", "Subject: This is a test", "", "Hey, this is a test. You can ignore this!", "", "Mroik", ]; let email = rx.recv().await.unwrap(); assert_eq!(email.from, "mroik@poul.org"); assert_eq!(email.recipient.len(), 1); assert_eq!(email.recipient[0], "mroik@delayed.space"); assert_eq!(email.data.trim(), lines.join("\r\n").trim()); // Close connection writer .write_all(Command::QUIT.to_string().as_bytes()) .await .unwrap(); buf.clear(); assert!(reader.read_line(&mut buf).await.unwrap() > 0); assert_eq!(buf.trim(), "221 OK"); buf.clear(); assert_eq!(reader.read_line(&mut buf).await.unwrap(), 0); s.shutdown().await.unwrap(); // This is necessary because the server continues to run even after closing a connection as // it continues to listen for new connections. // // In order to ensure that this doesn't affect other tests we have to abort it. j.abort(); } }