aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml2
-rw-r--r--src/main.rs1
-rw-r--r--src/model.rs7
-rw-r--r--src/process_mail.rs23
-rw-r--r--src/smtp_server.rs19
5 files changed, 47 insertions, 5 deletions
diff --git a/Cargo.toml b/Cargo.toml
index c42a1ee..570c437 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,4 +8,4 @@ anyhow = "1.0.102"
env_logger = "0.11.10"
log = "0.4.29"
rusqlite = "0.39.0"
-tokio = { version = "1.51.0", features = ["macros", "rt-multi-thread"] }
+tokio = { version = "1.51.0", features = ["macros", "rt-multi-thread", "sync"] }
diff --git a/src/main.rs b/src/main.rs
index fcce4d8..20ace78 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,6 @@
mod database;
mod model;
+mod process_mail;
mod smtp_server;
fn main() {
diff --git a/src/model.rs b/src/model.rs
index 7fcf80f..37a855f 100644
--- a/src/model.rs
+++ b/src/model.rs
@@ -346,6 +346,13 @@ impl SubscriptionQuery<'_> {
}
}
+// TODO
+pub struct Mail {
+ pub from: String,
+ pub recipient: Vec<String>,
+ pub data: String,
+}
+
#[cfg(test)]
mod tests {
use anyhow::Result;
diff --git a/src/process_mail.rs b/src/process_mail.rs
new file mode 100644
index 0000000..5a22ce1
--- /dev/null
+++ b/src/process_mail.rs
@@ -0,0 +1,23 @@
+use anyhow::Result;
+use tokio::sync::mpsc::{Receiver, Sender};
+
+use crate::model::Mail;
+
+struct MailProcessor {
+ rx: Receiver<Mail>,
+ tx: Sender<Mail>,
+}
+
+// TODO: Store first then forward. On complete forward remove from db, otherwise save db with the
+// emails to send to.
+impl MailProcessor {
+ async fn run(&mut self) -> Result<()> {
+ // TODO: Check againts self.rx.is_closed() instead to stop the program before consuming all
+ // of the queue. Store the emails not yet processed and restore the queue upon startup.
+ while let Some(mail) = self.rx.recv().await {
+ todo!()
+ }
+
+ Ok(())
+ }
+}
diff --git a/src/smtp_server.rs b/src/smtp_server.rs
index 2795ecf..bacf8eb 100644
--- a/src/smtp_server.rs
+++ b/src/smtp_server.rs
@@ -4,7 +4,9 @@ use std::{
};
use anyhow::Result;
-use tokio::spawn;
+use tokio::{spawn, sync::mpsc::Sender};
+
+use crate::model::Mail;
const SERVER_NAME: &str = "";
@@ -22,12 +24,13 @@ impl SmtpServer {
}
// TODO: trap SIGINT to stop server?
- pub async fn run(&mut self) -> Result<()> {
+ pub async fn run(&mut self, tx_processor: Sender<Mail>) -> Result<()> {
self.running = true;
while self.running {
let (stream, addr) = self.listener.accept()?;
let session = SessionHandler {
addr,
+ tx_processor: tx_processor.clone(),
state: SessionState::default(),
client_host: String::new(),
from: None,
@@ -42,6 +45,7 @@ impl SmtpServer {
struct SessionHandler {
addr: SocketAddr,
+ tx_processor: Sender<Mail>,
state: SessionState,
client_host: String,
from: Option<String>,
@@ -158,18 +162,25 @@ impl SessionHandler {
Ok(Reply::StartMailInput)
}
- // TODO: Forward and store email
async fn process_mail(&mut self) -> Result<Reply> {
if self.from.is_none() || self.to.is_empty() || self.data.trim().is_empty() {
return Ok(Reply::InvalidCommand);
}
+ let mail = Mail {
+ from: self.from.as_ref().unwrap().clone(),
+ recipient: self.to.clone(),
+ data: self.data.clone(),
+ };
+
+ self.tx_processor.send(mail).await?;
+
self.state = SessionState::Normal;
self.from = None;
self.to.clear();
self.data.clear();
- todo!()
+ Ok(Reply::Completed(String::from("Ok")))
}
}
XMR address: 854DmXNrxULU3ZFJVs4Wc8PFhbq29RhqHhY8W6cdWrtFN3qmooKyyeYPcDzZTNRxphhJ5UzASQfAdEMwSteVqymk28aLhqj