diff options
| author | Mroik <mroik@delayed.space> | 2026-04-11 05:14:41 +0200 |
|---|---|---|
| committer | Mroik <mroik@delayed.space> | 2026-04-13 06:56:11 +0200 |
| commit | 203f02b88e75a9b604b27922aecaceb31c44d20a (patch) | |
| tree | 99dc845bc8d6a5fd3a4ede70fa70b0d58390a60e /src | |
| parent | ffc0ad7c86408193b213c46cbca5c0d8b60f8632 (diff) | |
Replace std socket IO with tokio socket IO
Since the TcpListener (and related) operations are blocking, they would
prevent other tasks from running by monopolizing CPU time.
This should've been done from the start but I'm not used to directly
using low level primitives in an async environment as I would normally
use a high level library for this, so I forgot.
Signed-off-by: Mroik <mroik@delayed.space>
Diffstat (limited to 'src')
| -rw-r--r-- | src/smtp_server.rs | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/src/smtp_server.rs b/src/smtp_server.rs index e6c0efc..2805081 100644 --- a/src/smtp_server.rs +++ b/src/smtp_server.rs @@ -1,10 +1,12 @@ -use std::{ - io::{BufRead, BufReader, Write}, - net::{IpAddr, SocketAddr, TcpListener, TcpStream}, -}; +use std::net::{IpAddr, SocketAddr}; use anyhow::Result; -use tokio::{spawn, sync::mpsc::Sender}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{TcpListener, TcpStream}, + spawn, + sync::mpsc::Sender, +}; use crate::model::Mail; @@ -16,9 +18,9 @@ pub struct SmtpServer { } impl SmtpServer { - pub fn new(ip: [u8; 4], port: u16) -> Result<Self> { + pub async fn new(ip: [u8; 4], port: u16) -> Result<Self> { Ok(Self { - listener: TcpListener::bind((IpAddr::from(ip), port))?, + listener: TcpListener::bind((IpAddr::from(ip), port)).await?, running: false, }) } @@ -27,7 +29,7 @@ impl SmtpServer { pub async fn run(&mut self, tx_processor: Sender<Mail>) -> Result<()> { self.running = true; while self.running { - let (stream, addr) = match self.listener.accept() { + let (stream, addr) = match self.listener.accept().await { Ok(v) => v, Err(e) => match e.raw_os_error() { Some(libc::EMFILE | libc::ENFILE) => return Err(anyhow::Error::from(e)), @@ -68,20 +70,22 @@ struct SessionHandler { } impl SessionHandler { - async fn run(mut self, stream: TcpStream) -> Result<()> { - let mut writer = stream.try_clone()?; - let mut r = BufReader::new(&stream); + async fn run(mut self, mut stream: TcpStream) -> Result<()> { + let (r, mut writer) = stream.split(); + let mut reader = BufReader::new(r); let mut buffer = String::new(); - writer.write_all( - Reply::Ready(String::from(SERVER_NAME)) - .to_string() - .as_bytes(), - )?; + writer + .write_all( + Reply::Ready(String::from(SERVER_NAME)) + .to_string() + .as_bytes(), + ) + .await?; loop { buffer.clear(); - if r.read_line(&mut buffer)? == 0 { + if reader.read_line(&mut buffer).await? == 0 { break; } log::debug!("Received '{}' from '{}'", buffer.trim(), self.addr); @@ -100,13 +104,15 @@ impl SessionHandler { let command = match Command::try_from(buffer.as_str()) { Err(_) => { - writer.write_all(Reply::InvalidCommand.to_string().as_bytes())?; + writer + .write_all(Reply::InvalidCommand.to_string().as_bytes()) + .await?; continue; } Ok(v) => v, }; let res = self.apply(command).await?; - writer.write_all(res.to_string().as_bytes())?; + writer.write_all(res.to_string().as_bytes()).await?; } log::info!("Connection closed by {}", self.addr); |
