better threading (hopefully)

This commit is contained in:
Florian Stecker 2023-01-01 00:21:59 +01:00
parent 5696a1e7ab
commit bf15e2811f
2 changed files with 157 additions and 98 deletions

View File

@ -1,13 +1,14 @@
#![feature(io_error_more)]
use rustls::{OwnedTrustAnchor, ClientConfig, RootCertStore, ClientConnection}; use rustls::{OwnedTrustAnchor, ClientConfig, RootCertStore, ClientConnection};
use anyhow::{Result as AResult, bail}; use anyhow::{Result as AResult, bail};
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::io::{self, ErrorKind, Read, Write}; use std::io::{self, ErrorKind, Read, Write, Error as IOError};
use std::process::Command; use std::process::Command;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::mem;
use clap::Parser; use clap::Parser;
#[derive(Parser, Clone)] #[derive(Parser, Clone)]
@ -42,14 +43,6 @@ pub struct Cli {
verbose: u8, verbose: u8,
} }
#[derive(PartialEq, Eq, Debug)]
enum State {
Unauthenticated,
Authenticated,
Inbox,
Idling
}
/// runs the command if one of the following conditions is satisfied: /// runs the command if one of the following conditions is satisfied:
/// - `new_mail` is true /// - `new_mail` is true
/// - the command hasn't been run yet /// - the command hasn't been run yet
@ -60,7 +53,6 @@ enum State {
/// the command was run, and `cli.interval` minus the time elapsed since the last run /// the command was run, and `cli.interval` minus the time elapsed since the last run
/// in case the command was not run. /// in case the command was not run.
/// If `cli.interval` is `None`, the return value will be `None`. /// If `cli.interval` is `None`, the return value will be `None`.
pub fn run_command_if_needed(cli: &Cli, lastrun_mutex: &Mutex<Option<SystemTime>>, new_mail: bool) -> Option<Duration> { pub fn run_command_if_needed(cli: &Cli, lastrun_mutex: &Mutex<Option<SystemTime>>, new_mail: bool) -> Option<Duration> {
// locks the mutex until the end of the function, also preventing that the command is run concurrently // locks the mutex until the end of the function, also preventing that the command is run concurrently
let mut last = lastrun_mutex.lock().unwrap(); let mut last = lastrun_mutex.lock().unwrap();
@ -112,27 +104,147 @@ pub fn run_command_if_needed(cli: &Cli, lastrun_mutex: &Mutex<Option<SystemTime>
}); });
} }
pub fn run(cli: &Cli) -> AResult<()> { #[derive(Debug)]
// a mutex to avoid running the command concurrently struct Status {
let mutex_mainthread = Arc::new(Mutex::new(())); connected: bool,
let mutex_subthread = Arc::clone(&mutex_mainthread); last_run: SystemTime
}
if let Some(interval) = cli.interval { const CONNECTION_LOST_ERRORS: &[ErrorKind] = &[
ErrorKind::Interrupted,
ErrorKind::WouldBlock, // when a read times out
];
const CANT_CONNECT_ERRORS: &[ErrorKind] = &[
ErrorKind::ConnectionAborted,
ErrorKind::ConnectionReset,
ErrorKind::NotConnected,
ErrorKind::NetworkUnreachable,
ErrorKind::HostUnreachable,
];
pub fn run() -> AResult<()> {
let cli = Cli::parse();
let connection_status: Arc<Mutex<Status>> = Arc::new(Mutex::new(
Status { connected: false, last_run: SystemTime::now() }
));
// if interval is given, spawn a thread to take care of the regular calls
let timer_handle = cli.interval.map(|interval| {
let cmd = cli.command.clone(); let cmd = cli.command.clone();
let connection_status_subthread = Arc::clone(&connection_status);
thread::spawn(move || { thread::spawn(move || {
loop { let interval_duration = Duration::from_secs(interval);
thread::sleep(Duration::from_secs(interval)); let mut wait_time = interval_duration;
let lock = mutex_subthread.lock().unwrap();
println!("Interval timer expired, running command ...");
Command::new(cmd.as_os_str())
.output()
.expect("command execution failed");
println!("Command finished.");
mem::drop(lock);
}
});
}
loop {
// we just want to sleep, but park_timeout() allows
// interruption by the main thread
thread::park_timeout(wait_time);
let mut status = connection_status_subthread.lock().unwrap();
// check here if we should really run, and how long to wait for if not
// interval, SystemTime::now(), status.connected, status.last_run -> wait_time
let elapsed = status.last_run.elapsed().unwrap_or_default();
let run = status.connected && elapsed >= interval_duration;
println!("time = {:?}, run = {}", SystemTime::now(), run);
if run {
println!("Interval timer expired, running command ...");
Command::new(cmd.as_os_str())
.output()
.expect("command execution failed");
println!("Command finished.");
status.last_run = SystemTime::now();
wait_time = interval_duration;
} else {
wait_time = interval_duration - elapsed;
}
}
})
});
// what to do as soon as we're connected
let connect_callback = || {
if let Some(th) = &timer_handle {
connection_status.lock().unwrap().connected = true;
// we unpark the thread after reconnecting since a common cause of
// disconnects is suspend, after which the sleep timer might not do what
// we want
th.thread().unpark();
}
};
// what to do when the server tells us we got an email
let mail_callback = || {
let mut status = connection_status.lock().unwrap();
println!("New email, running command ...");
Command::new(cli.command.as_os_str())
.output()
.expect("command execution failed");
println!("Command finished.");
status.last_run = SystemTime::now();
};
// reconnect in an infinite loop
loop {
return match connect_and_idle(&cli, connect_callback , mail_callback) {
Ok(_) => Ok(()),
Err(err) => match err.downcast_ref::<IOError>() {
Some(io_err) if CONNECTION_LOST_ERRORS.contains(&io_err.kind()) => {
connection_status.lock().unwrap().connected = false;
let secs_to_reconnect = 10;
println!("Connection lost, reconnecting in {secs_to_reconnect} seconds");
thread::sleep(Duration::from_secs(secs_to_reconnect));
continue;
},
Some(io_err) if CANT_CONNECT_ERRORS.contains(&io_err.kind()) => {
connection_status.lock().unwrap().connected = false;
let secs_to_reconnect = 10*60;
println!("Cannot connect currently, retrying in {secs_to_reconnect} seconds");
thread::sleep(Duration::from_secs(secs_to_reconnect));
continue;
},
Some(io_err) => {
println!("{:?}", io_err.kind());
Err(err)
}
_ => Err(err)
}
}
}
}
#[derive(PartialEq, Eq, Debug)]
enum ImapState {
Unauthenticated,
Authenticated,
Inbox,
Idling
}
/// establish a connection to IMAP server, log in, run IDLE command, and wait
/// for mail to arrive
pub fn connect_and_idle<F: Fn(), G: Fn()>(cli: &Cli,
connected_callback: F,
mail_callback: G)
-> AResult<()> {
let tls_config = ClientConfig::builder() let tls_config = ClientConfig::builder()
.with_safe_defaults() .with_safe_defaults()
.with_root_certificates(RootCertStore { .with_root_certificates(RootCertStore {
@ -153,7 +265,7 @@ pub fn run(cli: &Cli) -> AResult<()> {
.map_err(|e|io::Error::new(ErrorKind::NotConnected, e.to_string()))? .map_err(|e|io::Error::new(ErrorKind::NotConnected, e.to_string()))?
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut socket = TcpStream::connect(addrs.as_slice())?; let mut socket = TcpStream::connect(addrs.as_slice())?;
let mut state = State::Unauthenticated; let mut state = ImapState::Unauthenticated;
socket.set_read_timeout(Some(Duration::from_secs(10*60)))?; socket.set_read_timeout(Some(Duration::from_secs(10*60)))?;
@ -177,7 +289,7 @@ pub fn run(cli: &Cli) -> AResult<()> {
for response in responses { for response in responses {
if cli.verbose > 0 { if cli.verbose > 0 {
if state == State::Unauthenticated { if state == ImapState::Unauthenticated {
if let Some(suite) = tls_client.negotiated_cipher_suite() { if let Some(suite) = tls_client.negotiated_cipher_suite() {
println!("negotiated cipher suite: {:?}", suite); println!("negotiated cipher suite: {:?}", suite);
} }
@ -187,33 +299,29 @@ pub fn run(cli: &Cli) -> AResult<()> {
} }
match state { match state {
State::Unauthenticated => if response.starts_with(b"* OK") { ImapState::Unauthenticated => if response.starts_with(b"* OK") {
let request = format!("A001 login {} {}\r\n", cli.username, cli.password); let request = format!("A001 login {} {}\r\n", cli.username, cli.password);
tls_client.writer().write(request.as_bytes())?; tls_client.writer().write(request.as_bytes())?;
state = State::Authenticated; state = ImapState::Authenticated;
}, },
State::Authenticated => if response.starts_with(b"A001 OK") { ImapState::Authenticated => if response.starts_with(b"A001 OK") {
tls_client.writer().write(b"A002 select inbox\r\n")?; tls_client.writer().write(b"A002 select inbox\r\n")?;
state = State::Inbox; state = ImapState::Inbox;
} else if response.starts_with(b"A001") { } else if response.starts_with(b"A001") {
bail!("The server rejected authentication"); bail!("The server rejected authentication");
}, },
State::Inbox => if response.starts_with(b"A002 OK") { ImapState::Inbox => if response.starts_with(b"A002 OK") {
tls_client.writer().write(b"A003 idle\r\n")?; tls_client.writer().write(b"A003 idle\r\n")?;
state = State::Idling; state = ImapState::Idling;
connected_callback();
// notify timer thread that we're live
} else if response.starts_with(b"A002") { } else if response.starts_with(b"A002") {
bail!("Selecting inbox failed"); bail!("Selecting inbox failed");
}, },
State::Idling => if response.starts_with(b"+ idling") { ImapState::Idling => if response.starts_with(b"+ idling") {
println!("Connected and idling ..."); println!("Connected and idling ...");
} else if response.starts_with(b"*") && response.ends_with(b"EXISTS") { } else if response.starts_with(b"*") && response.ends_with(b"EXISTS") {
let lock = mutex_mainthread.lock().unwrap(); mail_callback();
println!("New email, running command ...");
Command::new(cli.command.as_os_str())
.output()
.expect("command execution failed");
println!("Command finished.");
mem::drop(lock);
} }
} }
} }
@ -224,13 +332,3 @@ pub fn run(cli: &Cli) -> AResult<()> {
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dns_lookup() {
}
}

View File

@ -1,49 +1,10 @@
#![feature(io_error_more)]
use anyhow::Result as AResult; use anyhow::Result as AResult;
use std::io::{Error as IOError, ErrorKind};
use std::thread;
use std::time::Duration;
use clap::Parser;
const CONNECTION_LOST_ERRORS: &[ErrorKind] = &[
ErrorKind::Interrupted,
ErrorKind::WouldBlock, // when a read times out
];
const CANT_CONNECT_ERRORS: &[ErrorKind] = &[
ErrorKind::ConnectionAborted,
ErrorKind::ConnectionReset,
ErrorKind::NotConnected,
ErrorKind::NetworkUnreachable,
ErrorKind::HostUnreachable,
];
fn main() -> AResult<()> { fn main() -> AResult<()> {
let cli = imapidle::Cli::parse(); if let Err(err) = imapidle::run() {
println!("{err:?}");
loop { Err(err)
return match imapidle::run(&cli) { } else {
Ok(_) => Ok(()), Ok(())
Err(err) => match err.downcast_ref::<IOError>() {
Some(io_err) if CONNECTION_LOST_ERRORS.contains(&io_err.kind()) => {
let secs_to_reconnect = 10;
println!("Connection lost, reconnecting in {secs_to_reconnect} seconds");
thread::sleep(Duration::from_secs(secs_to_reconnect));
continue;
},
Some(io_err) if CANT_CONNECT_ERRORS.contains(&io_err.kind()) => {
let secs_to_reconnect = 10*60;
println!("Cannot connect currently, retrying in {secs_to_reconnect} seconds");
thread::sleep(Duration::from_secs(secs_to_reconnect));
continue;
},
Some(io_err) => {
println!("{:?}", io_err.kind());
Err(err)
}
_ => Err(err)
}
}
} }
} }