From 203f02b88e75a9b604b27922aecaceb31c44d20a Mon Sep 17 00:00:00 2001 From: Mroik Date: Sat, 11 Apr 2026 05:14:41 +0200 Subject: Replace std socket IO with tokio socket IO Since the TcpListener (and related) operations are blocking, they would prevent other tasks from running by monopolizing CPU time. This should've been done from the start but I'm not used to directly using low level primitives in an async environment as I would normally use a high level library for this, so I forgot. Signed-off-by: Mroik --- src/smtp_server.rs | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/smtp_server.rs b/src/smtp_server.rs index e6c0efc..2805081 100644 --- a/src/smtp_server.rs +++ b/src/smtp_server.rs @@ -1,10 +1,12 @@ -use std::{ - io::{BufRead, BufReader, Write}, - net::{IpAddr, SocketAddr, TcpListener, TcpStream}, -}; +use std::net::{IpAddr, SocketAddr}; use anyhow::Result; -use tokio::{spawn, sync::mpsc::Sender}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{TcpListener, TcpStream}, + spawn, + sync::mpsc::Sender, +}; use crate::model::Mail; @@ -16,9 +18,9 @@ pub struct SmtpServer { } impl SmtpServer { - pub fn new(ip: [u8; 4], port: u16) -> Result { + pub async fn new(ip: [u8; 4], port: u16) -> Result { Ok(Self { - listener: TcpListener::bind((IpAddr::from(ip), port))?, + listener: TcpListener::bind((IpAddr::from(ip), port)).await?, running: false, }) } @@ -27,7 +29,7 @@ impl SmtpServer { pub async fn run(&mut self, tx_processor: Sender) -> Result<()> { self.running = true; while self.running { - let (stream, addr) = match self.listener.accept() { + let (stream, addr) = match self.listener.accept().await { Ok(v) => v, Err(e) => match e.raw_os_error() { Some(libc::EMFILE | libc::ENFILE) => return Err(anyhow::Error::from(e)), @@ -68,20 +70,22 @@ struct SessionHandler { } impl SessionHandler { - async fn run(mut self, stream: TcpStream) -> Result<()> { - let mut writer = stream.try_clone()?; - let mut r = BufReader::new(&stream); + async fn run(mut self, mut stream: TcpStream) -> Result<()> { + let (r, mut writer) = stream.split(); + let mut reader = BufReader::new(r); let mut buffer = String::new(); - writer.write_all( - Reply::Ready(String::from(SERVER_NAME)) - .to_string() - .as_bytes(), - )?; + writer + .write_all( + Reply::Ready(String::from(SERVER_NAME)) + .to_string() + .as_bytes(), + ) + .await?; loop { buffer.clear(); - if r.read_line(&mut buffer)? == 0 { + if reader.read_line(&mut buffer).await? == 0 { break; } log::debug!("Received '{}' from '{}'", buffer.trim(), self.addr); @@ -100,13 +104,15 @@ impl SessionHandler { let command = match Command::try_from(buffer.as_str()) { Err(_) => { - writer.write_all(Reply::InvalidCommand.to_string().as_bytes())?; + writer + .write_all(Reply::InvalidCommand.to_string().as_bytes()) + .await?; continue; } Ok(v) => v, }; let res = self.apply(command).await?; - writer.write_all(res.to_string().as_bytes())?; + writer.write_all(res.to_string().as_bytes()).await?; } log::info!("Connection closed by {}", self.addr); -- cgit v1.3