diff --git a/flush_ls/src/main.rs b/flush_ls/src/main.rs deleted file mode 100644 index 61777a27..00000000 --- a/flush_ls/src/main.rs +++ /dev/null @@ -1,384 +0,0 @@ -/* -Copyright (c) 2015, Los Alamos National Security, LLC -All rights reserved. - -Copyright 2015. Los Alamos National Security, LLC. This software was -produced under U.S. Government contract DE-AC52-06NA25396 for Los -Alamos National Laboratory (LANL), which is operated by Los Alamos -National Security, LLC for the U.S. Department of Energy. The -U.S. Government has rights to use, reproduce, and distribute this -software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, -LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY -FOR THE USE OF THIS SOFTWARE. If software is modified to produce -derivative works, such modified software should be clearly marked, so -as not to confuse it with the version available from LANL. - -Additionally, redistribution and use in source and binary forms, with -or without modification, are permitted provided that the following -conditions are met: 1. Redistributions of source code must retain the -above copyright notice, this list of conditions and the following -disclaimer. - -2. Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer in the -documentation and/or other materials provided with the distribution. -3. Neither the name of Los Alamos National Security, LLC, Los Alamos -National Laboratory, LANL, the U.S. Government, nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND -CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, -BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS -ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER -IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ------ -NOTE: ------ -MarFS is released under the BSD license. - -MarFS was reviewed and released by LANL under Los Alamos Computer Code -identifier: LA-CC-15-039. - -MarFS uses libaws4c for Amazon S3 object communication. The original -version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the -LGPL license. LANL added functionality to the original work. The -original work plus LANL contributions is found at -https://github.com/jti-lanl/aws4c. - -GNU licenses can be found at http://www.gnu.org/licenses/. -*/ - -use clap::Parser; -use std::collections::BTreeMap; -use std::fs; -use std::mem; -use std::path::PathBuf; -use std::sync::{Arc, mpsc}; -use std::thread; -use std::time::{Duration, SystemTime}; - -// path components to leaves of marfs data tree -const PATH_SEGS: &[&str] = &[ - "pod", - "block", - "cap", - "scat" -]; - -// use BTreeMap to remove duplicates and sort on keys -// do not manually allocate - use parse_user_thresholds -type Thresholds = BTreeMap; - -/** - * Select how old files are allowed to be given system - * utilization and thresholds - * - * Example: - * thresholds: - * 10 -> 60 - * 20 -> 1 - * - * If the utilization is less than or equal to 10%, files older than - * 60 seconds should be flushed. If the utilization is greater than - * 10% and less than or equal to 20%, files older than 1 second should - * be flushed. - * - * TODO: Change to BTreeMap::upper_bound once btree_cursors is merged. - * - * @param thresholds mapping of thresholds to file age limits - * @param utilization system utilization - * @return file age limit in seconds - */ -fn util2age(thresholds: &Thresholds, utilization: u8) -> u64 { - // return the age associated with the first threshold - // that is greater than or equal to the utilization - for (threshold, age) in thresholds.iter() { - if *threshold >= utilization { - return *age; - } - } - - // this line does double duty as a compiler silencer - // and as an invalid utilization value check - panic!("Error: Utilization percentage not found"); -} - -/** - * Process files under /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ - * if (reftime - file.atime) > age, print the file's path - * - * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - * @return number of leaf files processed - */ -fn process_leaf(path: PathBuf, reftime: Arc, thresholds: Arc) -> u64 { - let entries = match fs::read_dir(&path) { - Ok(list) => list, - Err(error) => { - eprintln!("Warning: Could not read_dir {}: {}", path.display(), error); - return 0; - }, - }; - - // get the leaf's utilization - let util = unsafe { - use errno::errno; - use libc; - use std::ffi::CString; - use std::mem; - - let path_cstr = CString::new(path.display().to_string()).unwrap(); - let mut vfs_st: libc::statvfs = mem::zeroed(); - - if libc::statvfs(path_cstr.as_ptr(), &mut vfs_st as *mut libc::statvfs) < 0 { - println!("Warning: Getting utilization for {} failed: {}", path.display(), errno()); - return 0; - } - - (100 - vfs_st.f_bfree * 100 / vfs_st.f_blocks) as u8 - }; - - // figure out the file age limit - let age = util2age(&thresholds, util); - - let mut count = 0; - - // loop through leaf directory and find files older than the limit - for entry_res in entries { - let entry = match entry_res { - Ok(entry) => entry, - Err(error) => { - eprintln!("Warning: Could not get entry: {}", error); - continue; - }, - }; - - if let Ok(entry_type) = entry.file_type() { - let child = entry.path(); - if entry_type.is_file() { - if let Ok(st) = child.metadata() { - if let Ok(atime) = st.accessed() { - if let Ok(dur) = reftime.duration_since(atime) { - // older than allowed file age - print path for flushing - if dur.as_secs() > age { - println!("{}", child.display()); - count += 1; - } - } - } - } - } else { - eprintln!("Warning: {} is not a file", child.display()); - } - } - } - - return count; -} - -/** - * Find directories matching /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ - * - * @param path and lower - * @param level which path segment is currently being processed - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - * @param tx push thread handles here - * @return number of paths that had the expected path segment pattern - */ -fn process_non_leaf(path: PathBuf, level: usize, - reftime: Arc, - thresholds: Arc, - tx: mpsc::Sender>) -> u64 { - if level == 4 { - // panic on > 4? - let _ = tx.send(thread::spawn(move || { - process_leaf(path, reftime, thresholds) - })); - return 0; - } - - let entries = match fs::read_dir(&path) { - Ok(list) => list, - Err(error) => { - eprintln!("Warning: Could not read_dir {}: {}", path.display(), error); - return 0; - }, - }; - - let expected = PATH_SEGS[level]; - let len = expected.len(); - - let mut count = 0; - - // find paths that match current marfs path segment - for entry_res in entries { - let entry = match entry_res { - Ok(entry) => entry, - Err(error) => { - eprintln!("Warning: Could not get entry: {}", error); - continue; - }, - }; - - let child = entry.path(); - - if child.is_dir() == false { - continue; - } - - // make sure current basename has expected path segment - if let Some(basename) = child.file_name() { - if len < basename.len() { - if let Some(basename_str) = basename.to_str() { - if &basename_str[0..len] != expected { - continue; - } - - if let Err(_) = &basename_str[len..].parse::() { - continue; - } - - let tx_clone = tx.clone(); - let reftime_arc = reftime.clone(); - let thresholds_arc = thresholds.clone(); - - let _ = tx.send(thread::spawn(move || { - process_non_leaf(child, level + 1, reftime_arc, thresholds_arc, tx_clone) - })); - - count += 1; - } - } - } - } - - count -} - -/** - * Recurse down to /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ - * and find files that are older than the provided age - * - * @param dal_root - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - */ -fn print_flushable_in_dal(dal_root: &PathBuf, reftime: &SystemTime, thresholds: &Thresholds) { - let (tx, rx) = mpsc::channel(); - - let tx_clone = tx.clone(); - let path = dal_root.clone(); - let reftime_arc = Arc::new(*reftime); - let thresholds_arc = Arc::new(thresholds.clone()); - - let _ = tx.send(std::thread::spawn(move || { - process_non_leaf(path, 0, reftime_arc, thresholds_arc, tx_clone) - })); - - mem::drop(tx); - - while let Ok(thread) = rx.recv() { - let _ = thread.join(); - } -} - -/** - * Convert , strings from the commandline - * to integers and insert them into a map. - * - * Utilization is an integer representing utilization percentage. - * An integer is required because rust does not have an Ord trait - * defined for f32 and f64. https://stackoverflow.com/a/69117941/341683 - * - * Age is integer number of seconds since Jan 1, 1970 00:00:00 UTC. - * - * Example: - * - * ... ... 10,60 20,1 - * - * @param args a vector of strings parsed by clap - * @param delim separator between utilization and age - * @return a mapping of utilizations to file age limits - */ -fn parse_user_thresholds(args: &Vec, delim: char) -> Thresholds { - let mut thresholds = Thresholds::from([ - (0, u64::MAX), // if utilization is at 0%, don't flush anything - (100, 0), // if utilization is at 100%, flush everything - ]); - - for arg in args { - match arg.split_once(delim) { - Some((util_str, age_str)) => { - let util = match util_str.parse::() { - Ok(val) => val, - Err(error) => panic!("Error: Bad utilization string: '{}': {}", util_str, error), - }; - - if util > 100 { - panic!("Error: Utilization can be between 0% and 100%. Got '{}'", util); - } - - let age = match age_str.parse::() { - Ok(val) => val, - Err(error) => panic!("Error: Bad age string: '{}': {}", age_str, error), - }; - - thresholds.insert(util, age); - }, - None => panic!("Error: Bad , string: '{}'", arg), - } - } - - // check for monotonically decreasing file ages - let mut prev = thresholds.first_key_value(); - for (utilization, age) in thresholds.iter().skip(1) { - if age >= prev.unwrap().1 { - panic!("Error: File age must be strictly monotonically decreasing. Found {},{} -> {},{}", - prev.unwrap().0, prev.unwrap().1, utilization, age); - } - - prev = Some((&utilization, &age)) - } - - thresholds -} - -#[derive(Parser, Debug)] -#[command()] -struct Args { - #[arg(help="DAL root path")] - root: PathBuf, - - #[arg(help="Reference Timestamp (Seconds Since Epoch)")] - reftime: u64, - - #[arg(help="Comma separated utilization percentage (integer) and age (integer seconds) thresholds")] - thresholds: Vec, -} - -fn main() { - let args = Args::parse(); - - // get reference timestamp - let reftime = SystemTime::UNIX_EPOCH + Duration::from_secs(args.reftime); - - // convert user input to a map - let thresholds = parse_user_thresholds(&args.thresholds, ','); - - // find files older than age - print_flushable_in_dal(&args.root, &reftime, &thresholds); -} - -#[cfg(test)] -mod tests; diff --git a/flush_ls/src/tests.rs b/flush_ls/src/tests.rs deleted file mode 100644 index 4fe0aa03..00000000 --- a/flush_ls/src/tests.rs +++ /dev/null @@ -1,337 +0,0 @@ -/* -Copyright (c) 2015, Los Alamos National Security, LLC -All rights reserved. - -Copyright 2015. Los Alamos National Security, LLC. This software was -produced under U.S. Government contract DE-AC52-06NA25396 for Los -Alamos National Laboratory (LANL), which is operated by Los Alamos -National Security, LLC for the U.S. Department of Energy. The -U.S. Government has rights to use, reproduce, and distribute this -software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, -LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY -FOR THE USE OF THIS SOFTWARE. If software is modified to produce -derivative works, such modified software should be clearly marked, so -as not to confuse it with the version available from LANL. - -Additionally, redistribution and use in source and binary forms, with -or without modification, are permitted provided that the following -conditions are met: 1. Redistributions of source code must retain the -above copyright notice, this list of conditions and the following -disclaimer. - -2. Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer in the -documentation and/or other materials provided with the distribution. -3. Neither the name of Los Alamos National Security, LLC, Los Alamos -National Laboratory, LANL, the U.S. Government, nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND -CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, -BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS -ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER -IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ------ -NOTE: ------ -MarFS is released under the BSD license. - -MarFS was reviewed and released by LANL under Los Alamos Computer Code -identifier: LA-CC-15-039. - -MarFS uses libaws4c for Amazon S3 object communication. The original -version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the -LGPL license. LANL added functionality to the original work. The -original work plus LANL contributions is found at -https://github.com/jti-lanl/aws4c. - -GNU licenses can be found at http://www.gnu.org/licenses/. -*/ - -#[cfg(test)] -mod tests { - use crate::*; - use tempfile::{TempDir, tempdir}; - - fn setup_dirs() -> (TempDir, PathBuf) { - // DAL root - let root = tempdir().unwrap(); - - // create intermediate directories - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - - for path_seg in PATH_SEGS { - let numbered = PathBuf::from(String::from(*path_seg) + "0"); - path = path.join(numbered); - let _ = fs::create_dir(&path).unwrap(); - } - - // return root to prevent destructor call - (root, path) - } - - fn setup_file(path: &PathBuf, name: &str, atime: SystemTime) { - // create pod/block/cap/scat/* - let mut filename = path.to_owned(); - filename = filename.join(name); - - let utime = fs::FileTimes::new().set_accessed(atime); - - let file = fs::File::create(&filename).unwrap(); - file.set_times(utime).unwrap(); - } - - #[test] - fn process_leaf_good() { - let reftime = SystemTime::now(); - let (_root, path) = setup_dirs(); - - // create 2 files - setup_file(&path, "0", reftime - Duration::from_secs(1)); - setup_file(&path, "1", reftime - Duration::from_secs(24 * 60 * 60)); - - // find files older than 2 seconds - { - let thresholds = Thresholds::from([(100, 2)]); - assert_eq!(process_leaf(path.clone(), Arc::new(reftime), Arc::new(thresholds)), 1); - } - - // find files older than 0 seconds - { - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(reftime), Arc::new(thresholds)), 2); - } - } - - #[test] - fn process_leaf_file() { - let root = tempdir().unwrap(); - - // process_leaf should take in a directory path, not a file path - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("file"); - let _ = fs::File::create(&path).unwrap(); - - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(SystemTime::now()), Arc::new(thresholds)), 0); - } - - #[test] - fn process_leaf_dir() { - let root = tempdir().unwrap(); - - // path should only have files under it - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("dir"); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(SystemTime::now()), Arc::new(thresholds)), 0); - } - - #[test] - fn process_non_leaf_bad_dir() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - - // the provided path does not exist - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("non-existant-path"); - - let children = process_non_leaf(path, 0, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_bad_dir_name() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - let level = 0; - - // subdirectory does exist, but is not an expected path segment - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push(String::from("a") + PATH_SEGS[level]); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, level, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_partial_match() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - let level = 0; - - // subdirectory prefix matches, but suffix is not an integer - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push(String::from(PATH_SEGS[level]) + "a"); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, level, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_file() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - - // there should only be subdirectories, not files - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("file"); - let _ = fs::File::create(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, 0, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn print_flushable_in_dal_good() { - let (root, _) = setup_dirs(); - let thresholds = parse_user_thresholds(&vec!["0,2".to_string()], ','); - print_flushable_in_dal(&root.path().to_path_buf(), &SystemTime::UNIX_EPOCH, &thresholds); - } - - #[test] - fn user_threshold_good_single() { - let args = vec![ - "1,1".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 3); - assert_eq!(thresholds.get(&1), Some(&1)); - } - - #[test] - fn user_threshold_good_multiple() { - let args = vec![ - "1,2".to_string(), - "2,1".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 4); - assert_eq!(thresholds.get(&1), Some(&2)); - assert_eq!(thresholds.get(&2), Some(&1)); - } - - #[test] - fn user_threshold_good_repeat() { - let args = vec![ - "0,1".to_string(), // overwrite's default 0% utilization - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 2); - assert_eq!(thresholds.get(&0), Some(&1)); - } - - #[test] - #[should_panic(expected="Error: Bad , string: ''")] - fn user_threshold_empty() { - parse_user_thresholds(&vec!["".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad age string: '': cannot parse integer from empty string")] - fn user_threshold_digit_empty() { - parse_user_thresholds(&vec!["1,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] - fn user_threshold_empty_digit() { - parse_user_thresholds(&vec![",1".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: 'a': invalid digit found in string")] - fn user_threshold_alpha_empty() { - parse_user_thresholds(&vec!["a,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] - fn user_threshold_empty_alpha() { - parse_user_thresholds(&vec![",a".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Utilization can be between 0% and 100%. Got '200'")] - fn user_threshold_too_big() { - parse_user_thresholds(&vec!["200,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,1")] - fn user_threshold_same_ages() { - parse_user_thresholds(&vec!["10,1".to_string(), "20,1".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,2")] - fn user_threshold_increasing_ages() { - parse_user_thresholds(&vec!["10,1".to_string(), "20,2".to_string()], ','); - } - - #[test] - fn util2age_good() { - // low utilization -> flush older files - // high utilization -> flush recent files - let args = vec![ - "10,90".to_string(), - "20,80".to_string(), - "30,70".to_string(), - "40,60".to_string(), - "50,50".to_string(), - "60,40".to_string(), - "70,30".to_string(), - "80,20".to_string(), - "90,10".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 11); - - assert_eq!(util2age(&thresholds, 05), 90); - assert_eq!(util2age(&thresholds, 15), 80); - assert_eq!(util2age(&thresholds, 25), 70); - assert_eq!(util2age(&thresholds, 35), 60); - assert_eq!(util2age(&thresholds, 45), 50); - assert_eq!(util2age(&thresholds, 55), 40); - assert_eq!(util2age(&thresholds, 65), 30); - assert_eq!(util2age(&thresholds, 75), 20); - assert_eq!(util2age(&thresholds, 85), 10); - assert_eq!(util2age(&thresholds, 95), 00); - } - - #[test] - fn util2age_empty() { - let thresholds = parse_user_thresholds(&vec![], ','); - assert_eq!(thresholds.len(), 2); - util2age(&thresholds, 0); - } - - #[test] - #[should_panic(expected = "Error: Utilization percentage not found")] - fn util2age_gt_100() { - let thresholds = parse_user_thresholds(&vec![], ','); - assert_eq!(thresholds.len(), 2); - util2age(&thresholds, 200); - } -} diff --git a/flush_ls/Cargo.lock b/obj_scanner/Cargo.lock similarity index 62% rename from flush_ls/Cargo.lock rename to obj_scanner/Cargo.lock index 75967574..148f9064 100644 --- a/flush_ls/Cargo.lock +++ b/obj_scanner/Cargo.lock @@ -2,6 +2,27 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.6.17" @@ -57,6 +78,15 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "cc" +version = "1.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -119,6 +149,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.1.1" @@ -132,7 +174,28 @@ dependencies = [ "clap", "errno", "libc", + "regex", + "rusqlite", "tempfile", + "threadpool", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown", ] [[package]] @@ -141,6 +204,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -153,18 +222,51 @@ version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "proc-macro2" version = "1.0.89" @@ -183,6 +285,49 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustix" version = "0.38.37" @@ -196,6 +341,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "strsim" version = "0.11.1" @@ -226,6 +383,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "unicode-ident" version = "1.0.13" @@ -238,6 +404,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "windows-sys" version = "0.52.0" @@ -319,3 +497,23 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/flush_ls/Cargo.toml b/obj_scanner/Cargo.toml similarity index 74% rename from flush_ls/Cargo.toml rename to obj_scanner/Cargo.toml index fc76f2bd..f2e063e3 100644 --- a/flush_ls/Cargo.toml +++ b/obj_scanner/Cargo.toml @@ -9,4 +9,7 @@ edition = "2021" clap = { version = "4.5.20", features = ["derive"] } errno = "0.3.9" libc = "0.2.161" +regex = "1.11.1" +rusqlite = { version = "0.32.1", features = ["bundled"] } tempfile = "3.13.0" +threadpool = "1.8.1" diff --git a/obj_scanner/example.config b/obj_scanner/example.config new file mode 100644 index 00000000..71faba45 --- /dev/null +++ b/obj_scanner/example.config @@ -0,0 +1,29 @@ +# General Format: +# Label: value +# +# order does not matter +# + +# reftime: +# last reftime will be used +# can be overwridden by commandline argument +reftime: 0 + +# threshold: , +# multiple threshold labels will be combined into a set of unique pairs +# duplicate utilization values will be overwritten by the last duplicate +threshold: 10,90 +threshold: 20,80 +threshold: 30,70 +threshold: 40,60 +threshold: 50,50 +threshold: 60,40 +threshold: 70,30 +threshold: 80,20 +threshold: 90,10 + +# blacklist: +# multiple blacklist labels will be aggregated +# all blacklist regexps will be run in the order listed +# duplicates are not removed +blacklist: ^.*\.inprog$ diff --git a/obj_scanner/src/config.rs b/obj_scanner/src/config.rs new file mode 100644 index 00000000..3bd4a441 --- /dev/null +++ b/obj_scanner/src/config.rs @@ -0,0 +1,507 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + + +use regex::Regex; +use std::collections::BTreeMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; +use std::time::{Duration, SystemTime, SystemTimeError}; + +/** + * This struct represents the contents of a config file. + * + * The config file contains key-value pairs separated by the ':' character. + * Key-value pairs may appear in any order. + * Duplicate key-value pairs may appear. How they are handled depends on the key. + */ +#[derive(Clone)] +pub struct Config { + // Reference Timestamp (Seconds Since Unix Epoch) + reftime: SystemTime, + + // Mapping from system utilization to allowed file age + // using BTreeMap to remove duplicates and sort on keys + thresholds: BTreeMap, + + // Regular expressions for basenames to ignore + blacklist: Vec, +} + +impl Config { + const LABEL_DELIM: char = ':'; + const THRESHOLD_DELIM: char = ','; + + fn new() -> Config { + Self { + reftime: SystemTime::UNIX_EPOCH, + thresholds: BTreeMap::from([ + (0, u64::MAX), // if utilization is at 0%, don't flush anything + (100, 0), // if utilization is at 100%, flush everything + ]), + blacklist: Vec::new(), + } + } + + pub fn from_pathbuf(path: PathBuf) -> Config { + let mut config = Config::new(); + + let file = match File::open(&path) { + Ok(f) => f, + Err(msg) => panic!("Error: Could not open flush config file {}: {}", + path.display(), msg), + }; + + // parse file + // labels can appear in any order + // processing function determines if the new value is overwrites the old value or is aggregated + for line_res in BufReader::new(file).lines() { + let line = match line_res { + Ok(line) => line, + Err(msg) => panic!("Error: Could not read line from {}: {}", + path.display(), msg), + }; + + config.process_line(&line); + } + + config.verify_thresholds(); + + config + } + + fn process_line(&mut self, line: &str) { + if line.len() == 0 { + return; + } + + if line.chars().nth(0) == Some('#') { + return; + } + + match line.split_once(Self::LABEL_DELIM) { + Some((label, value)) => { + match label.trim() { + "reftime" => self.set_reftime_str(value), + "threshold" => self.add_to_threshold(value), + "blacklist" => self.add_to_blacklist(value), + _ => panic!("Error: Unknown config label: {}", label), + }; + + }, + None => panic!("Error: Did not find separator in line: {}", line), + }; + } + + pub fn set_reftime(&mut self, reftime: u64) { + self.reftime = SystemTime::UNIX_EPOCH + Duration::from_secs(reftime); + } + + fn set_reftime_str(&mut self, time_str: &str) { + match time_str.trim().parse::() { + Ok(reftime) => self.set_reftime(reftime), + Err(msg) => panic!("Error: Could not convert {} into a timestamp: {}", + time_str, msg), + }; + } + + /** + * Convert threshold strings from the config file to integers and + * insert them into a map. + * + * Format: + * threshold: , + * + * Utilization is an integer representing utilization percentage. + * An integer is required because rust does not have an Ord trait + * defined for f32 and f64. https://stackoverflow.com/a/69117941/341683 + * + * Age is integer number of seconds since Jan 1, 1970 00:00:00 UTC. + * + * Examples: + * + * threshold: 10,60 + * threshold:20, 1 + */ + fn add_to_threshold(&mut self, pair_str: &str) { + match pair_str.trim().split_once(Self::THRESHOLD_DELIM) { + Some((util_str, age_str)) => { + let util = match util_str.trim().parse::() { + Ok(val) => val, + Err(msg) => panic!("Error: Bad utilization string: '{}': {}", util_str, msg), + }; + + if util > 100 { + panic!("Error: Utilization can be between 0% and 100%. Got '{}'", util); + } + + let age = match age_str.trim().parse::() { + Ok(val) => val, + Err(msg) => panic!("Error: Bad age string: '{}': {}", age_str, msg), + }; + + self.thresholds.insert(util, age); + }, + None => panic!("Error: Bad , string: '{}'", pair_str), + } + } + + pub fn add_to_blacklist(&mut self, regex: &str) { + match Regex::new(regex.trim()) { + Ok(re) => self.blacklist.push(re), + Err(msg) => panic!("Error: Bad regex pattern: {}: {}", regex, msg), + }; + } + + /** + * Check for monotonically decreasing file ages. + * + * Call this function after the entire config file has been + * processed + */ + fn verify_thresholds(&self) { + let mut prev = self.thresholds.first_key_value(); + for (utilization, age) in self.thresholds.iter().skip(1) { + if age >= prev.unwrap().1 { + panic!("Error: File age must be strictly monotonically decreasing. Found {},{} -> {},{}", + prev.unwrap().0, prev.unwrap().1, utilization, age); + } + + prev = Some((&utilization, &age)) + } + } + + // this will error if the input is later than the reftime + // (duration is negative) - propogate the error to let the + // caller handle it + pub fn file_age(&self, timestamp: SystemTime) -> Result { + self.reftime.duration_since(timestamp) + } + + /** + * Select how old files are allowed to be given system + * utilization and thresholds + * + * Example: + * thresholds: + * 10 -> 60 + * 20 -> 1 + * + * If the utilization is less than or equal to 10%, files older than + * 60 seconds should be flushed. If the utilization is greater than + * 10% and less than or equal to 20%, files older than 1 second should + * be flushed. + * + * TODO: Change to BTreeMap::upper_bound once btree_cursors is merged. + * + * @param utilization system utilization + * @return file age limit in seconds + */ + pub fn util2age(&self, utilization: u8) -> u64 { + // return the age associated with the first threshold + // that is greater than or equal to the utilization + for (threshold, age) in self.thresholds.iter() { + if *threshold >= utilization { + return *age; + } + } + + // this line does double duty as a compiler silencer + // and as an invalid utilization value check + panic!("Error: Utilization percentage not found"); + } + + pub fn is_blacklisted(&self, file_name: &str) -> bool { + for regex in &self.blacklist { + if regex.is_match(file_name) { + return true; + } + } + + false + } +} + +#[cfg(test)] +mod tests { + use crate::config::Config; + use std::path::PathBuf; + use std::time::SystemTime; + use tempfile::NamedTempFile; + + #[test] + #[should_panic(expected = "Error: Could not convert a into a timestamp: invalid digit found in string")] + fn set_reftime_str_bad() { + let mut config = Config::new(); + config.set_reftime_str("a"); + } + + #[test] + fn set_reftime() { + let mut config = Config::new(); + config.set_reftime(0); + + assert_eq!(config.reftime, SystemTime::UNIX_EPOCH); + } + + #[test] + fn newer_reftime() { + let mut config = Config::new(); + config.set_reftime(0); + + assert_eq!(config.file_age(SystemTime::now()).is_err(), true); + } + + #[test] + fn user_threshold_good_single() { + let mut config = Config::new(); + config.add_to_threshold("1,1"); + + assert_eq!(config.thresholds.len(), 3); + assert_eq!(config.thresholds.get(&1), Some(&1)); + } + + #[test] + fn user_threshold_good_multiple() { + let mut config = Config::new(); + config.add_to_threshold("1,2"); + config.add_to_threshold("2,1"); + + assert_eq!(config.thresholds.len(), 4); + assert_eq!(config.thresholds.get(&1), Some(&2)); + assert_eq!(config.thresholds.get(&2), Some(&1)); + } + + #[test] + fn user_threshold_good_repeat() { + let mut config = Config::new(); + config.add_to_threshold("0,1"); + + assert_eq!(config.thresholds.len(), 2); + assert_eq!(config.thresholds.get(&0), Some(&1)); + } + + #[test] + #[should_panic(expected="Error: Bad , string: ''")] + fn user_threshold_empty() { + let mut config = Config::new(); + config.add_to_threshold(""); + } + + #[test] + #[should_panic(expected="Error: Bad age string: '': cannot parse integer from empty string")] + fn user_threshold_digit_empty() { + let mut config = Config::new(); + config.add_to_threshold("1,"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] + fn user_threshold_empty_digit() { + let mut config = Config::new(); + config.add_to_threshold(",1"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: 'a': invalid digit found in string")] + fn user_threshold_alpha_empty() { + let mut config = Config::new(); + config.add_to_threshold("a,"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] + fn user_threshold_empty_alpha() { + let mut config = Config::new(); + config.add_to_threshold(",a"); + } + + #[test] + #[should_panic(expected="Error: Utilization can be between 0% and 100%. Got '200'")] + fn user_threshold_too_big() { + let mut config = Config::new(); + config.add_to_threshold("200,"); + } + + #[test] + #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,1")] + fn user_threshold_same_ages() { + let mut config = Config::new(); + config.add_to_threshold("10,1"); + config.add_to_threshold("20,1"); + config.verify_thresholds(); + } + + #[test] + #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,2")] + fn user_threshold_increasing_ages() { + let mut config = Config::new(); + config.add_to_threshold("10,1"); + config.add_to_threshold("20,2"); + config.verify_thresholds(); + } + + #[test] + #[should_panic(expected = "Error: Utilization percentage not found")] + fn util2age_gt_100() { + let config = Config::new(); + let _ = config.util2age(200); + } + + #[test] + #[should_panic(expected = "Error: Bad regex pattern: \\: regex parse error:")] + fn bad_regex() { + let mut config = Config::new(); + config.add_to_blacklist("\\"); + } + + #[test] + fn no_blacklist() { + let config = Config::new(); + + assert_eq!(config.is_blacklisted(""), false); + assert_eq!(config.is_blacklisted("test"), false); + assert_eq!(config.is_blacklisted("match"), false); + assert_eq!(config.is_blacklisted("matching"), false); + } + + #[test] + fn blacklist() { + let mut config = Config::new(); + config.add_to_blacklist("^match.+$"); + + assert_eq!(config.is_blacklisted(""), false); + assert_eq!(config.is_blacklisted("test"), false); + assert_eq!(config.is_blacklisted("match"), false); + assert_eq!(config.is_blacklisted("matching"), true); + } + + #[test] + #[should_panic(expected = "Error: Did not find separator in line: bad label")] + fn missing_separator() { + let mut config = Config::new(); + config.process_line("bad label"); + } + + #[test] + #[should_panic(expected = "Error: Unknown config label: bad label")] + fn bad_label() { + let mut config = Config::new(); + config.process_line("bad label: "); + } + + #[test] + #[should_panic(expected = "Error: Could not open flush config file")] + fn nonexistant_config() { + let file = NamedTempFile::new().unwrap(); + let path = PathBuf::from(file.path()); + let _ = file.close(); + + let _ = Config::from_pathbuf(path); + } + + #[test] + fn empty_config() { + let config = Config::new(); + + assert_eq!(config.thresholds.len(), 2); + assert_eq!(config.blacklist.len(), 0); + } + + #[test] + fn example_config() { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("example.config"); + + let config = Config::from_pathbuf(path); + + assert_eq!(config.reftime, SystemTime::UNIX_EPOCH); + + assert_eq!(config.thresholds.len(), 11); + + // get raw values + assert_eq!(config.thresholds.get(&000), Some(&u64::MAX)); + assert_eq!(config.thresholds.get(&010), Some(&90)); + assert_eq!(config.thresholds.get(&020), Some(&80)); + assert_eq!(config.thresholds.get(&030), Some(&70)); + assert_eq!(config.thresholds.get(&040), Some(&60)); + assert_eq!(config.thresholds.get(&050), Some(&50)); + assert_eq!(config.thresholds.get(&060), Some(&40)); + assert_eq!(config.thresholds.get(&070), Some(&30)); + assert_eq!(config.thresholds.get(&080), Some(&20)); + assert_eq!(config.thresholds.get(&090), Some(&10)); + assert_eq!(config.thresholds.get(&100), Some(&00)); + + // get file age given utilization + assert_eq!(config.util2age(00), u64::MAX); + assert_eq!(config.util2age(05), 90); + assert_eq!(config.util2age(15), 80); + assert_eq!(config.util2age(25), 70); + assert_eq!(config.util2age(35), 60); + assert_eq!(config.util2age(45), 50); + assert_eq!(config.util2age(55), 40); + assert_eq!(config.util2age(65), 30); + assert_eq!(config.util2age(75), 20); + assert_eq!(config.util2age(85), 10); + assert_eq!(config.util2age(95), 00); + + assert_eq!(config.blacklist.len(), 1); + } +} diff --git a/obj_scanner/src/main.rs b/obj_scanner/src/main.rs new file mode 100644 index 00000000..0e45f99f --- /dev/null +++ b/obj_scanner/src/main.rs @@ -0,0 +1,591 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + +use clap::{Args, Parser}; +use errno; +use regex::Regex; +use rusqlite::{Connection, OpenFlags}; +use std::cmp::min; +use std::fs; +use std::io::Write; +use std::path::PathBuf; +use std::sync::{Arc, mpsc}; +use std::time::{Duration, SystemTime}; +use threadpool::ThreadPool; + +mod config; + +#[derive(Debug, Args, Clone)] +#[group(required=true, multiple=true)] +struct Ops { + #[arg(long, help="Print leaf files to flush")] + flush: bool, + + #[arg(long, help="Print leaf files to push")] + push: bool, +} + +#[derive(Clone)] +struct FlushPush { + config: config::Config, + ops: Ops, + must_match: Option, + force: bool, +} + +// list of paths to write to output files +#[derive(Clone)] +struct Output { + push: mpsc::Sender, + flush: mpsc::Sender, +} + +// path components to leaves of marfs data tree +static PATH_SEGS: &[&str] = &[ + "pod", + "block", + "cap", + "scat" +]; + +// ////////////////////////////////////// +// Constants for database containing files that have been pushed +// +// If a file is selected for pushing, log it here as though it has +// been pushed so that it will not show up again next time this is run +// +// If a file is selected for flushing, it will be removed from the +// database +static PUSHDB_NAME: &str = "push.db"; + +// pattern for blacklisting this file +static PUSHDB_REGEX: &str = "^push\\.db$"; + +/** + * Open and/or create the PUSH database + * Tables are set up if they don't exist + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @return a handle to the PUSH database + */ +fn open_pushdb(path: &PathBuf) -> Result { + let mut dbname = path.clone(); + dbname.push(PUSHDB_NAME); + + match Connection::open_with_flags(&dbname, + OpenFlags::SQLITE_OPEN_CREATE | + OpenFlags::SQLITE_OPEN_READ_WRITE) { + Ok(conn) => { + // these can't really fail + let _ = conn.execute_batch("CREATE TABLE IF NOT EXISTS push (name TEXT PRIMARY KEY, mtime int64);"); + let _ = conn.execute_batch("CREATE TABLE IF NOT EXISTS mtime (oldest int64);"); + Ok(conn) + }, + Err(msg) => Err(format!("Could not open PUSH database {}: {}", + dbname.display(), msg)), + } +} +// ////////////////////////////////////// + +/** + * Get the filesystem utilization of the provided path + * using statvfs, rounded down to the nearest integer. + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @return utilization + */ +fn get_utilization(path: &PathBuf) -> Result { + // get the leaf's utilization + unsafe { + use libc; + use std::ffi::CString; + use std::mem; + + let path_cstr = CString::new(path.display().to_string()).unwrap(); + let mut vfs_st: libc::statvfs = mem::zeroed(); + + if libc::statvfs(path_cstr.as_ptr(), &mut vfs_st as *mut libc::statvfs) < 0 { + return Err(errno::errno()); + } + + Ok((100 - vfs_st.f_bfree * 100 / vfs_st.f_blocks) as u8) + } +} + +/** + * Process files under /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * + * if this file's basename is allowed + * if FLUSH + * if (reftime - file.mtime) > age + * print the file's path and delete it from the PUSH db + * + * if file was not flushed, record min(mtime) + * + * if PUSH + * insert the path into the PUSH db and print the file's path + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + */ +fn process_leaf(path: PathBuf, fp: Arc, output: Output) { + // open/create the PUSH database + let pushdb = match open_pushdb(&path) { + Ok(conn) => conn, + Err(msg) => { eprintln!("Error: {}", msg); return; }, + }; + + // get previously existing oldest mtime, if one exists + // + // if a previous mtime was not found, default to UNIX_EPOCH: new + // mtimes must come after UNIX_EPOCH, so every file will pass the + // mtime check + let prev_oldest_mtime = SystemTime::UNIX_EPOCH + + match pushdb.query_row::("SELECT oldest FROM mtime;", [], |row| row.get(0)) { + Ok(val) => Duration::from_secs(val), + Err(_) => Duration::from_secs(0), + }; + + // get the leaf's utilization + let util = match get_utilization(&path) { + Ok(val) => val, + Err(msg) => { eprintln!("Warning: Getting utilization for {} failed: {}", path.display(), msg); return; }, + }; + + // figure out the file age limit + // get here to not do repeated searches + let max_age = fp.config.util2age(util); + + // use a time that all files should have been created before + // assumes all system clocks are not off by more than a year + let future_mtime = SystemTime::now() + Duration::from_secs(60 * 60 * 24 * 365); + let mut curr_oldest_mtime = future_mtime.clone(); + + let entries = match fs::read_dir(&path) { + Ok(list) => list, + Err(msg) => { eprintln!("Error: Could not read_dir {}: {}", path.display(), msg); return; }, + }; + + // loop through leaf directory and find files older than the limit + for entry_res in entries { + let entry = match entry_res { + Ok(entry) => entry, + Err(msg) => { eprintln!("Warning: Could not get entry: {}", msg); continue; }, + }; + + let child = entry.path(); + + if let Ok(entry_type) = entry.file_type() { + // only expecting files under leaf directories + if !entry_type.is_file() { + eprintln!("Warning: {} is not a file. Skipping.", child.display()); + continue; + } + } else { + eprintln!("Warning: Could not get type of {}. Skipping.", child.display()); + continue; + } + + if let Ok(st) = child.metadata() { + if let Ok(mtime) = st.modified() { + if let Ok(file_age) = fp.config.file_age(mtime) { + let basename = child.file_name().unwrap().to_str().unwrap(); + + // basename must match in order to be processed + if let Some(whitelist) = &fp.must_match { + if !whitelist.is_match(&basename) { + continue; + } + } + + // do not process blacklisted files + if fp.config.is_blacklisted(&basename) { + continue; + } + + // If the FLUSH flag was specified, target object + // components will be selected if their mtime + // value is older than the value corresponding to + // the current FS fullness percentage, as + // specified in the config file. + // + // Do actual FLUSH later in order to track mtime + // + // Can mtime < prev_oldest_mtime happen? + let flush = fp.ops.flush && + (fp.force || ((mtime >= prev_oldest_mtime) && (file_age.as_secs() > max_age))); + + // keep track of the oldest mtime value amongst + // all encountered files, excluding those for + // which a FLUSH op is targeted, regardless of + // whether they match any white/blacklist object + // regex. + if !flush && (mtime < curr_oldest_mtime) { + curr_oldest_mtime = mtime; + } + + // process FLUSH here + if flush { + let _ = output.flush.send(child.clone()); + let _ = pushdb.execute(&format!("DELETE FROM push WHERE name == '{}';", basename), []); + continue; // flushed files are done being processed + } + + if !fp.ops.push { + continue; + } + + let msecs = mtime.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + // if the path is not already in the PUSH db, queue it for printing + if let Ok(_) = pushdb.execute(&format!("INSERT INTO push (name, mtime) VALUES ('{}', {});", + basename, msecs), []) { + let _ = output.push.send(child.clone()); + continue; // done + } + + // file is already listed - check the timestamp + match pushdb.query_row::(&format!("SELECT mtime FROM push WHERE name == '{}';", + basename), [], |row| row.get(0)) { + Ok(old_mtime) => { + let old_mtime = SystemTime::UNIX_EPOCH + Duration::from_secs(old_mtime); + if old_mtime < mtime { + // attempt to update PUSH db + if let Err(msg) = pushdb.execute(&format!("UPDATE push SET mtime = {} WHERE name == '{}';", + msecs, basename), []) { + eprintln!("Error: Could not update mtime of previously pushed file {}: {}", + child.display(), msg); + } + + // add to PUSH file no matter what + let _ = output.push.send(child.clone()); + } else { + // add to PUSH file anyways + if fp.force { + let _ = output.push.send(child.clone()); + } + } + }, + Err(msg) => { + eprintln!("Warning: Could not get existing mtime for {}, so pushing again: {}", + child.display(), msg); + let _ = output.push.send(child.clone()); + }, + }; + } + } + } + } + + // if the oldest mtime was updated (this directory has at least 1 file), write it to PUSH db + if (curr_oldest_mtime > prev_oldest_mtime) && + (curr_oldest_mtime != future_mtime) { + let msecs = curr_oldest_mtime.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + // remove all rows just in case there's more than 1 row + let _ = pushdb.execute_batch("DELETE FROM mtime;"); + + // insert the new oldest mtime + let _ = pushdb.execute(&format!("INSERT INTO mtime (oldest) VALUES ({});", msecs), []); + } +} + +/** + * Find directories matching /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ + * + * @param path and lower + * @param level which path segment is currently being processed + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + * @param threadpool the threadpool that is processing child work + */ +fn process_non_leaf(path: PathBuf, level: usize, + fp: Arc, + output: Output, + threadpool: ThreadPool) { + if level == 4 { + // panic on > 4? + threadpool.execute(move || { + process_leaf(path, fp, output); + }); + return; + } + + let entries = match fs::read_dir(&path) { + Ok(list) => list, + Err(msg) => { + eprintln!("Warning: Could not read_dir {}: {}", path.display(), msg); + return; + }, + }; + + let expected = PATH_SEGS[level]; + let len = expected.len(); + + // find paths that match current marfs path segment + for entry_res in entries { + let entry = match entry_res { + Ok(entry) => entry, + Err(msg) => { + eprintln!("Warning: Could not get entry: {}", msg); + continue; + }, + }; + + let child = entry.path(); + + if child.is_dir() == false { + continue; + } + + // make sure current basename has expected path segment + if let Some(basename) = child.file_name() { + if len < basename.len() { + if let Some(basename_str) = basename.to_str() { + if &basename_str[0..len] != expected { + continue; + } + + if let Err(_) = &basename_str[len..].parse::() { + continue; + } + + let fp_clone = fp.clone(); + let threadpool_clone = threadpool.clone(); + let output_clone = output.clone(); + + threadpool.execute(move ||{ + process_non_leaf(child, level + 1, + fp_clone, + output_clone, + threadpool_clone); + }); + } + } + } + } +} + +/** + * Recurse down to /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ + * and find files that are older than the provided age + * + * @param dal_root + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + * @param nthreads the number of the threads that should be used to process this DAL root + */ +fn print_flushable_in_dal(dal_root: &PathBuf, + fp: FlushPush, + output: Output, + nthreads: usize) { + let path = dal_root.clone(); + let fp_arc = Arc::new(fp); + + // no need to clone output here + + let pool = ThreadPool::new(nthreads); + let pool_clone = pool.clone(); + + pool.execute(move || { + process_non_leaf(path, 0, fp_arc, output, pool_clone); + }); + + // Can't parallelize writes during processing: + // - need to drop original tx, but if write threads interrupt + // walk threads, they might complete before walk threads + // + // - write threads would sit on threads for the entire lifetime + // of the threadpool, blocking any work queued behind them, + // preventing completion + // - this threadpool doesn't steal work + // + // - spawning (nthreads - 2) walk threads and then spawning 2 + // more threads for writing does not seem correct + // + // - spawning nthreads walk threads and then spawning 2 more + // threads for writing does not seem correct either + // + // - not writing in walk threads in order to use locking of + // mpsc + // + // - threading rx clones/references at the end of each leaf to + // write paths in parallel doesn't make sense + // - just pass around file handles and locks + // + + pool.join(); +} + +/** + * Iterate through rx channel endpoints and write the paths to the + * appropriate files. + * + * @param output_dir directory to place files at + * @param flush the rx channel endpoint containing flush paths + * @param push the rx channel endpoint containing push paths + * @param nthreads number of threads allowed to use to write output + */ +fn write_outputs(output_dir: PathBuf, ops: Ops, + flush: mpsc::Receiver, + push: mpsc::Receiver, + nthreads: usize) { + fn write_paths(pool: &ThreadPool, + output_dir: PathBuf, + name: &str, + paths: mpsc::Receiver) { + let name_str = String::from(name); + pool.execute(move || { + let mut output_path = output_dir; + output_path.push(name_str); + let mut output_file = match fs::File::create(output_path.clone()) { + Ok(file) => file, + Err(msg) => panic!("Error: Could not open flush file {}: {}", output_path.display(), msg), + }; + + for path in paths { + let _ = output_file.write_all((path.display().to_string() + "\n").as_bytes()); + } + }); + } + + let nfiles = if ops.flush { 1 } else { 0 } + if ops.push { 1 } else { 0 }; + let pool = ThreadPool::new(min(nthreads, nfiles)); + if ops.flush { + write_paths(&pool, output_dir.clone(), "flush", flush); + } + if ops.push { + write_paths(&pool, output_dir.clone(), "push", push); + } + pool.join(); +} + +#[derive(Parser, Debug)] +#[command()] +struct Cli { + #[arg(help="DAL root path")] + dal_root: PathBuf, + + #[arg(help="Path to config file")] + config_file: PathBuf, + + #[arg(help="Output Directory")] + output_dir: PathBuf, + + #[clap(flatten)] + ops: Ops, + + #[arg(short, long, value_name="regexp")] + #[arg(help="Regex pattern which objects must match in order to be eligible for any operation")] + must_match: Option, + + #[arg(short, long, default_value="1", help="Thread Count")] + nthreads: usize, + + #[arg(short, long, help="Force operation even if condition is not met")] + force: bool, + + #[arg(short, long, help="Overwrite config file reftime")] + reftime: Option, + + #[arg(short, long, help="Input path is a leaf dir")] + leaf: bool +} + +fn main() { + let cli = Cli::parse(); + + let mut fp = { FlushPush { + config: config::Config::from_pathbuf(cli.config_file.clone()), + ops: cli.ops.clone(), + must_match: cli.must_match, + force: cli.force, + } }; + + // never process PUSH db + fp.config.add_to_blacklist(PUSHDB_REGEX); + + // overwrite reftime found in config file with command line argument + if let Some(reftime) = cli.reftime { + fp.config.set_reftime(reftime); + } + + // output channels for each type of operation + let (flush_tx, flush_rx) = mpsc::channel(); + let (push_tx, push_rx) = mpsc::channel(); + + // threads write to tx + let output = Output { + flush: flush_tx, + push: push_tx, + }; + + if cli.leaf { + process_leaf(cli.dal_root, Arc::new(fp), output); + } else { + print_flushable_in_dal(&cli.dal_root, fp, output, cli.nthreads); + } + + // read from rx + write_outputs(cli.output_dir, cli.ops, flush_rx, push_rx, cli.nthreads); +} + +#[cfg(test)] +mod tests; diff --git a/obj_scanner/src/tests.rs b/obj_scanner/src/tests.rs new file mode 100644 index 00000000..3af84d15 --- /dev/null +++ b/obj_scanner/src/tests.rs @@ -0,0 +1,295 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + +use crate::*; +use std::mem; +use tempfile::{TempDir, tempdir}; + +fn setup_dal() -> (TempDir, PathBuf) { + // DAL root + let root = tempdir().unwrap(); + + // create intermediate directories + let mut leaf = PathBuf::from(root.path().to_path_buf().to_owned()); + + for path_seg in PATH_SEGS { + leaf.push(String::from(*path_seg) + "0"); + let _ = fs::create_dir(&leaf); + } + + // return root to prevent destructor call + (root, leaf) +} + +fn setup_file(path: &PathBuf, name: &str, mtime: u64) { + // create pod/block/cap/scat/* + let mut filename = path.clone(); + filename = filename.join(name); + + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(mtime); + let utime = fs::FileTimes::new().set_modified(timestamp); + + let file = fs::File::create(&filename).unwrap(); + let _ = file.set_times(utime); +} + +fn setup_config(flush: bool, push: bool, force: bool) -> FlushPush { + let mut config_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + config_path.push("example.config"); + + let mut fp = FlushPush { + config: config::Config::from_pathbuf(config_path), + ops: Ops { + flush: flush, + push: push, + }, + must_match: Some(Regex::new(".*").unwrap()), + force: force, + }; + + fp.config.set_reftime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()); + fp.config.add_to_blacklist(PUSHDB_REGEX); + + fp +} + +fn setup_channels() -> (Output, (mpsc::Receiver, mpsc::Receiver)) { + let (flush_tx, flush_rx) = mpsc::channel::(); + let (push_tx, push_rx) = mpsc::channel::(); + + (Output { + flush: flush_tx, + push: push_tx, + }, (flush_rx, push_rx)) +} + +fn get_u64(path: &PathBuf, sql: &str) -> Result { + let mut dbname = path.clone(); + dbname.push(PUSHDB_NAME); + + let conn = Connection::open_with_flags(&dbname, + OpenFlags::SQLITE_OPEN_CREATE | + OpenFlags::SQLITE_OPEN_READ_WRITE)?; + + conn.query_row::(sql, [], |row| row.get(0)) +} + +#[test] +fn process_leaf_correctness() { + let (_root, leaf) = setup_dal(); + let (tx, _rx) = setup_channels(); + + // push on empty leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // empty leaf + { + let mut count = 0; + + for _ in fs::read_dir(&leaf).unwrap() { + count += 1; + } + + assert_eq!(count, 0); + } + + // getting prev mtime fails because this is an empty leaf + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + assert!(!get_u64(&leaf, "SELECT oldest FROM mtime;").is_ok(), "Should not have gotten an mtime"); + + // PUSHDB has been created + { + let mut count = 0; + + for _ in fs::read_dir(&leaf).unwrap() { + count += 1; + } + + assert_eq!(count, 1); + } + } + + // add a file to establish an mtime + static FILENAME: &str = "old_file"; + static FIRST_MTIME: u64 = 1000; + setup_file(&leaf, FILENAME, FIRST_MTIME); + + // push leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // getting prev mtime fails (again) because the previous run did not set it + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // mtime will be set this time + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), FIRST_MTIME); + + // file will be pushed because it was not selected for flush + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + + // getting prev mtime should succeed + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // force push file that has already been pushed + let fp = Arc::new(setup_config(false, true, true)); + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + } + + // ////////////////////////////////////////////////// + + // update mtime or else process_leaf() will + // think this file has already been flushed + static SECOND_MTIME: u64 = 1000000; + setup_file(&leaf, FILENAME, SECOND_MTIME); + + // push leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // will update row in PUSH db + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // prev oldest mtime changed + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), SECOND_MTIME); + + // still only 1 file in table + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + } + + // flush leaf + { + let fp = Arc::new(setup_config(true, false, false)); + + // will delete row from PUSH db + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // prev oldest mtime not changed + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), SECOND_MTIME); + + // file is no longer scheduled for flushing (because it has been flushed) + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 0); + } +} + +#[test] +fn process_non_leaf_empty() { + let (root, leaf) = setup_dal(); + let fp = setup_config(false, false, false); + let (tx, _rx) = setup_channels(); + + let dal_root = PathBuf::from(root.path().to_path_buf().to_owned()); + print_flushable_in_dal(&dal_root, fp.clone(), tx.clone(), 1); + + let mut bad_root = leaf.clone(); + bad_root.push("non-existant"); + print_flushable_in_dal(&bad_root, fp.clone(), tx.clone(), 1); +} + +#[test] +fn readonly_pushdb() { + let dir = tempdir().unwrap(); + let mut path = PathBuf::from(dir.path().to_path_buf().to_owned()); + path.push("ro"); + + let _ = fs::OpenOptions::new().read(true).write(false).create(true).open(&path); + + assert!(!open_pushdb(&path).is_ok(), "open_pushdb should not have succeeded"); +} + +#[test] +fn write_output_files() { + let dir = tempdir().unwrap(); + let path = PathBuf::from(dir.path().to_path_buf().to_owned()); + + static FLUSH: &str = "flush"; + static PUSH: &str = "push"; + + let ops = Ops { + flush: true, + push: true, + }; + + let flush_contents = FLUSH.to_string(); + let (flush_tx, flush_rx) = mpsc::channel::(); + let _ = flush_tx.send(PathBuf::from(&flush_contents)); + mem::drop(flush_tx); + + let push_contents = PUSH.to_string(); + let (push_tx, push_rx) = mpsc::channel::(); + let _ = push_tx.send(PathBuf::from(&push_contents)); + mem::drop(push_tx); + + write_outputs(path.clone(), ops, flush_rx, push_rx, 1); + + { + let mut filename = path.clone(); + filename.push(FLUSH); + let contents = fs::read_to_string(filename).unwrap(); + assert_eq!(contents, flush_contents + "\n"); + } + + { + let mut filename = path.clone(); + filename.push(PUSH); + let contents = fs::read_to_string(filename).unwrap(); + assert_eq!(contents, push_contents + "\n"); + } +}