From ac01311ed2a0ff6afe603a18e8f1bf0abbabca22 Mon Sep 17 00:00:00 2001 From: Jason Lee Date: Thu, 7 Nov 2024 19:07:23 +0000 Subject: [PATCH] Marchive Object Scanner (Pusher/Flusher) Scan MarFS DAL and write out paths for flushing or pushing. Added config file to set up default file age reference time (can be overwritten in commandline) filesystem utilization thresholds object regex blacklist An SQLite database file is placed into each leaf directory to store paths of files that have been pushed but not flushed. Flushing them will remove the records from the database. Added single whitelist regexp that can be passed in for matching. Added --force flag to force flush/push even if not eligible. Added --leaf to process a single leaf dir instead of an entire DAL. This is an update to flush_ls. --- .github/workflows/ci.yml | 6 +- flush_ls/src/main.rs | 384 ----------------- flush_ls/src/tests.rs | 337 --------------- {flush_ls => obj_scanner}/Cargo.lock | 210 +++++++++- {flush_ls => obj_scanner}/Cargo.toml | 5 +- obj_scanner/example.config | 29 ++ obj_scanner/src/config.rs | 507 +++++++++++++++++++++++ obj_scanner/src/main.rs | 591 +++++++++++++++++++++++++++ obj_scanner/src/tests.rs | 295 +++++++++++++ 9 files changed, 1633 insertions(+), 731 deletions(-) delete mode 100644 flush_ls/src/main.rs delete mode 100644 flush_ls/src/tests.rs rename {flush_ls => obj_scanner}/Cargo.lock (62%) rename {flush_ls => obj_scanner}/Cargo.toml (69%) create mode 100644 obj_scanner/example.config create mode 100644 obj_scanner/src/config.rs create mode 100644 obj_scanner/src/main.rs create mode 100644 obj_scanner/src/tests.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c7a8f901..7c040dc8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -50,7 +50,7 @@ jobs: cd $GITHUB_WORKSPACE # https://doc.rust-lang.org/cargo/guide/continuous-integration.html - flush_ls: + obj_scanner: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -60,8 +60,8 @@ jobs: - name: Build run: cargo build --verbose - working-directory: flush_ls + working-directory: obj_scanner - name: Test run: cargo test --verbose - working-directory: flush_ls + working-directory: obj_scanner diff --git a/flush_ls/src/main.rs b/flush_ls/src/main.rs deleted file mode 100644 index 61777a27..00000000 --- a/flush_ls/src/main.rs +++ /dev/null @@ -1,384 +0,0 @@ -/* -Copyright (c) 2015, Los Alamos National Security, LLC -All rights reserved. - -Copyright 2015. Los Alamos National Security, LLC. This software was -produced under U.S. Government contract DE-AC52-06NA25396 for Los -Alamos National Laboratory (LANL), which is operated by Los Alamos -National Security, LLC for the U.S. Department of Energy. The -U.S. Government has rights to use, reproduce, and distribute this -software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, -LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY -FOR THE USE OF THIS SOFTWARE. If software is modified to produce -derivative works, such modified software should be clearly marked, so -as not to confuse it with the version available from LANL. - -Additionally, redistribution and use in source and binary forms, with -or without modification, are permitted provided that the following -conditions are met: 1. Redistributions of source code must retain the -above copyright notice, this list of conditions and the following -disclaimer. - -2. Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer in the -documentation and/or other materials provided with the distribution. -3. Neither the name of Los Alamos National Security, LLC, Los Alamos -National Laboratory, LANL, the U.S. Government, nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND -CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, -BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS -ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER -IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ------ -NOTE: ------ -MarFS is released under the BSD license. - -MarFS was reviewed and released by LANL under Los Alamos Computer Code -identifier: LA-CC-15-039. - -MarFS uses libaws4c for Amazon S3 object communication. The original -version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the -LGPL license. LANL added functionality to the original work. The -original work plus LANL contributions is found at -https://github.com/jti-lanl/aws4c. - -GNU licenses can be found at http://www.gnu.org/licenses/. -*/ - -use clap::Parser; -use std::collections::BTreeMap; -use std::fs; -use std::mem; -use std::path::PathBuf; -use std::sync::{Arc, mpsc}; -use std::thread; -use std::time::{Duration, SystemTime}; - -// path components to leaves of marfs data tree -const PATH_SEGS: &[&str] = &[ - "pod", - "block", - "cap", - "scat" -]; - -// use BTreeMap to remove duplicates and sort on keys -// do not manually allocate - use parse_user_thresholds -type Thresholds = BTreeMap; - -/** - * Select how old files are allowed to be given system - * utilization and thresholds - * - * Example: - * thresholds: - * 10 -> 60 - * 20 -> 1 - * - * If the utilization is less than or equal to 10%, files older than - * 60 seconds should be flushed. If the utilization is greater than - * 10% and less than or equal to 20%, files older than 1 second should - * be flushed. - * - * TODO: Change to BTreeMap::upper_bound once btree_cursors is merged. - * - * @param thresholds mapping of thresholds to file age limits - * @param utilization system utilization - * @return file age limit in seconds - */ -fn util2age(thresholds: &Thresholds, utilization: u8) -> u64 { - // return the age associated with the first threshold - // that is greater than or equal to the utilization - for (threshold, age) in thresholds.iter() { - if *threshold >= utilization { - return *age; - } - } - - // this line does double duty as a compiler silencer - // and as an invalid utilization value check - panic!("Error: Utilization percentage not found"); -} - -/** - * Process files under /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ - * if (reftime - file.atime) > age, print the file's path - * - * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - * @return number of leaf files processed - */ -fn process_leaf(path: PathBuf, reftime: Arc, thresholds: Arc) -> u64 { - let entries = match fs::read_dir(&path) { - Ok(list) => list, - Err(error) => { - eprintln!("Warning: Could not read_dir {}: {}", path.display(), error); - return 0; - }, - }; - - // get the leaf's utilization - let util = unsafe { - use errno::errno; - use libc; - use std::ffi::CString; - use std::mem; - - let path_cstr = CString::new(path.display().to_string()).unwrap(); - let mut vfs_st: libc::statvfs = mem::zeroed(); - - if libc::statvfs(path_cstr.as_ptr(), &mut vfs_st as *mut libc::statvfs) < 0 { - println!("Warning: Getting utilization for {} failed: {}", path.display(), errno()); - return 0; - } - - (100 - vfs_st.f_bfree * 100 / vfs_st.f_blocks) as u8 - }; - - // figure out the file age limit - let age = util2age(&thresholds, util); - - let mut count = 0; - - // loop through leaf directory and find files older than the limit - for entry_res in entries { - let entry = match entry_res { - Ok(entry) => entry, - Err(error) => { - eprintln!("Warning: Could not get entry: {}", error); - continue; - }, - }; - - if let Ok(entry_type) = entry.file_type() { - let child = entry.path(); - if entry_type.is_file() { - if let Ok(st) = child.metadata() { - if let Ok(atime) = st.accessed() { - if let Ok(dur) = reftime.duration_since(atime) { - // older than allowed file age - print path for flushing - if dur.as_secs() > age { - println!("{}", child.display()); - count += 1; - } - } - } - } - } else { - eprintln!("Warning: {} is not a file", child.display()); - } - } - } - - return count; -} - -/** - * Find directories matching /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ - * - * @param path and lower - * @param level which path segment is currently being processed - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - * @param tx push thread handles here - * @return number of paths that had the expected path segment pattern - */ -fn process_non_leaf(path: PathBuf, level: usize, - reftime: Arc, - thresholds: Arc, - tx: mpsc::Sender>) -> u64 { - if level == 4 { - // panic on > 4? - let _ = tx.send(thread::spawn(move || { - process_leaf(path, reftime, thresholds) - })); - return 0; - } - - let entries = match fs::read_dir(&path) { - Ok(list) => list, - Err(error) => { - eprintln!("Warning: Could not read_dir {}: {}", path.display(), error); - return 0; - }, - }; - - let expected = PATH_SEGS[level]; - let len = expected.len(); - - let mut count = 0; - - // find paths that match current marfs path segment - for entry_res in entries { - let entry = match entry_res { - Ok(entry) => entry, - Err(error) => { - eprintln!("Warning: Could not get entry: {}", error); - continue; - }, - }; - - let child = entry.path(); - - if child.is_dir() == false { - continue; - } - - // make sure current basename has expected path segment - if let Some(basename) = child.file_name() { - if len < basename.len() { - if let Some(basename_str) = basename.to_str() { - if &basename_str[0..len] != expected { - continue; - } - - if let Err(_) = &basename_str[len..].parse::() { - continue; - } - - let tx_clone = tx.clone(); - let reftime_arc = reftime.clone(); - let thresholds_arc = thresholds.clone(); - - let _ = tx.send(thread::spawn(move || { - process_non_leaf(child, level + 1, reftime_arc, thresholds_arc, tx_clone) - })); - - count += 1; - } - } - } - } - - count -} - -/** - * Recurse down to /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ - * and find files that are older than the provided age - * - * @param dal_root - * @param reftime a timestamp to compare atimes with - * @param thresholds mapping of thresholds to file age limits - */ -fn print_flushable_in_dal(dal_root: &PathBuf, reftime: &SystemTime, thresholds: &Thresholds) { - let (tx, rx) = mpsc::channel(); - - let tx_clone = tx.clone(); - let path = dal_root.clone(); - let reftime_arc = Arc::new(*reftime); - let thresholds_arc = Arc::new(thresholds.clone()); - - let _ = tx.send(std::thread::spawn(move || { - process_non_leaf(path, 0, reftime_arc, thresholds_arc, tx_clone) - })); - - mem::drop(tx); - - while let Ok(thread) = rx.recv() { - let _ = thread.join(); - } -} - -/** - * Convert , strings from the commandline - * to integers and insert them into a map. - * - * Utilization is an integer representing utilization percentage. - * An integer is required because rust does not have an Ord trait - * defined for f32 and f64. https://stackoverflow.com/a/69117941/341683 - * - * Age is integer number of seconds since Jan 1, 1970 00:00:00 UTC. - * - * Example: - * - * ... ... 10,60 20,1 - * - * @param args a vector of strings parsed by clap - * @param delim separator between utilization and age - * @return a mapping of utilizations to file age limits - */ -fn parse_user_thresholds(args: &Vec, delim: char) -> Thresholds { - let mut thresholds = Thresholds::from([ - (0, u64::MAX), // if utilization is at 0%, don't flush anything - (100, 0), // if utilization is at 100%, flush everything - ]); - - for arg in args { - match arg.split_once(delim) { - Some((util_str, age_str)) => { - let util = match util_str.parse::() { - Ok(val) => val, - Err(error) => panic!("Error: Bad utilization string: '{}': {}", util_str, error), - }; - - if util > 100 { - panic!("Error: Utilization can be between 0% and 100%. Got '{}'", util); - } - - let age = match age_str.parse::() { - Ok(val) => val, - Err(error) => panic!("Error: Bad age string: '{}': {}", age_str, error), - }; - - thresholds.insert(util, age); - }, - None => panic!("Error: Bad , string: '{}'", arg), - } - } - - // check for monotonically decreasing file ages - let mut prev = thresholds.first_key_value(); - for (utilization, age) in thresholds.iter().skip(1) { - if age >= prev.unwrap().1 { - panic!("Error: File age must be strictly monotonically decreasing. Found {},{} -> {},{}", - prev.unwrap().0, prev.unwrap().1, utilization, age); - } - - prev = Some((&utilization, &age)) - } - - thresholds -} - -#[derive(Parser, Debug)] -#[command()] -struct Args { - #[arg(help="DAL root path")] - root: PathBuf, - - #[arg(help="Reference Timestamp (Seconds Since Epoch)")] - reftime: u64, - - #[arg(help="Comma separated utilization percentage (integer) and age (integer seconds) thresholds")] - thresholds: Vec, -} - -fn main() { - let args = Args::parse(); - - // get reference timestamp - let reftime = SystemTime::UNIX_EPOCH + Duration::from_secs(args.reftime); - - // convert user input to a map - let thresholds = parse_user_thresholds(&args.thresholds, ','); - - // find files older than age - print_flushable_in_dal(&args.root, &reftime, &thresholds); -} - -#[cfg(test)] -mod tests; diff --git a/flush_ls/src/tests.rs b/flush_ls/src/tests.rs deleted file mode 100644 index 4fe0aa03..00000000 --- a/flush_ls/src/tests.rs +++ /dev/null @@ -1,337 +0,0 @@ -/* -Copyright (c) 2015, Los Alamos National Security, LLC -All rights reserved. - -Copyright 2015. Los Alamos National Security, LLC. This software was -produced under U.S. Government contract DE-AC52-06NA25396 for Los -Alamos National Laboratory (LANL), which is operated by Los Alamos -National Security, LLC for the U.S. Department of Energy. The -U.S. Government has rights to use, reproduce, and distribute this -software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, -LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY -FOR THE USE OF THIS SOFTWARE. If software is modified to produce -derivative works, such modified software should be clearly marked, so -as not to confuse it with the version available from LANL. - -Additionally, redistribution and use in source and binary forms, with -or without modification, are permitted provided that the following -conditions are met: 1. Redistributions of source code must retain the -above copyright notice, this list of conditions and the following -disclaimer. - -2. Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer in the -documentation and/or other materials provided with the distribution. -3. Neither the name of Los Alamos National Security, LLC, Los Alamos -National Laboratory, LANL, the U.S. Government, nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND -CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, -BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS -ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER -IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ------ -NOTE: ------ -MarFS is released under the BSD license. - -MarFS was reviewed and released by LANL under Los Alamos Computer Code -identifier: LA-CC-15-039. - -MarFS uses libaws4c for Amazon S3 object communication. The original -version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the -LGPL license. LANL added functionality to the original work. The -original work plus LANL contributions is found at -https://github.com/jti-lanl/aws4c. - -GNU licenses can be found at http://www.gnu.org/licenses/. -*/ - -#[cfg(test)] -mod tests { - use crate::*; - use tempfile::{TempDir, tempdir}; - - fn setup_dirs() -> (TempDir, PathBuf) { - // DAL root - let root = tempdir().unwrap(); - - // create intermediate directories - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - - for path_seg in PATH_SEGS { - let numbered = PathBuf::from(String::from(*path_seg) + "0"); - path = path.join(numbered); - let _ = fs::create_dir(&path).unwrap(); - } - - // return root to prevent destructor call - (root, path) - } - - fn setup_file(path: &PathBuf, name: &str, atime: SystemTime) { - // create pod/block/cap/scat/* - let mut filename = path.to_owned(); - filename = filename.join(name); - - let utime = fs::FileTimes::new().set_accessed(atime); - - let file = fs::File::create(&filename).unwrap(); - file.set_times(utime).unwrap(); - } - - #[test] - fn process_leaf_good() { - let reftime = SystemTime::now(); - let (_root, path) = setup_dirs(); - - // create 2 files - setup_file(&path, "0", reftime - Duration::from_secs(1)); - setup_file(&path, "1", reftime - Duration::from_secs(24 * 60 * 60)); - - // find files older than 2 seconds - { - let thresholds = Thresholds::from([(100, 2)]); - assert_eq!(process_leaf(path.clone(), Arc::new(reftime), Arc::new(thresholds)), 1); - } - - // find files older than 0 seconds - { - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(reftime), Arc::new(thresholds)), 2); - } - } - - #[test] - fn process_leaf_file() { - let root = tempdir().unwrap(); - - // process_leaf should take in a directory path, not a file path - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("file"); - let _ = fs::File::create(&path).unwrap(); - - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(SystemTime::now()), Arc::new(thresholds)), 0); - } - - #[test] - fn process_leaf_dir() { - let root = tempdir().unwrap(); - - // path should only have files under it - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("dir"); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let thresholds = Thresholds::from([(100, 0)]); - assert_eq!(process_leaf(path.clone(), Arc::new(SystemTime::now()), Arc::new(thresholds)), 0); - } - - #[test] - fn process_non_leaf_bad_dir() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - - // the provided path does not exist - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("non-existant-path"); - - let children = process_non_leaf(path, 0, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_bad_dir_name() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - let level = 0; - - // subdirectory does exist, but is not an expected path segment - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push(String::from("a") + PATH_SEGS[level]); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, level, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_partial_match() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - let level = 0; - - // subdirectory prefix matches, but suffix is not an integer - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push(String::from(PATH_SEGS[level]) + "a"); - let _ = fs::create_dir(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, level, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn process_non_leaf_file() { - let root = tempdir().unwrap(); - let (tx, _) = mpsc::channel(); - - // there should only be subdirectories, not files - let mut path = PathBuf::from(root.path().to_path_buf().to_owned()); - path.push("file"); - let _ = fs::File::create(&path).unwrap(); - path.pop(); - - let children = process_non_leaf(path, 0, Arc::new(SystemTime::now()), Arc::new(Thresholds::new()), tx); - assert_eq!(children, 0); - } - - #[test] - fn print_flushable_in_dal_good() { - let (root, _) = setup_dirs(); - let thresholds = parse_user_thresholds(&vec!["0,2".to_string()], ','); - print_flushable_in_dal(&root.path().to_path_buf(), &SystemTime::UNIX_EPOCH, &thresholds); - } - - #[test] - fn user_threshold_good_single() { - let args = vec![ - "1,1".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 3); - assert_eq!(thresholds.get(&1), Some(&1)); - } - - #[test] - fn user_threshold_good_multiple() { - let args = vec![ - "1,2".to_string(), - "2,1".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 4); - assert_eq!(thresholds.get(&1), Some(&2)); - assert_eq!(thresholds.get(&2), Some(&1)); - } - - #[test] - fn user_threshold_good_repeat() { - let args = vec![ - "0,1".to_string(), // overwrite's default 0% utilization - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 2); - assert_eq!(thresholds.get(&0), Some(&1)); - } - - #[test] - #[should_panic(expected="Error: Bad , string: ''")] - fn user_threshold_empty() { - parse_user_thresholds(&vec!["".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad age string: '': cannot parse integer from empty string")] - fn user_threshold_digit_empty() { - parse_user_thresholds(&vec!["1,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] - fn user_threshold_empty_digit() { - parse_user_thresholds(&vec![",1".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: 'a': invalid digit found in string")] - fn user_threshold_alpha_empty() { - parse_user_thresholds(&vec!["a,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] - fn user_threshold_empty_alpha() { - parse_user_thresholds(&vec![",a".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: Utilization can be between 0% and 100%. Got '200'")] - fn user_threshold_too_big() { - parse_user_thresholds(&vec!["200,".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,1")] - fn user_threshold_same_ages() { - parse_user_thresholds(&vec!["10,1".to_string(), "20,1".to_string()], ','); - } - - #[test] - #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,2")] - fn user_threshold_increasing_ages() { - parse_user_thresholds(&vec!["10,1".to_string(), "20,2".to_string()], ','); - } - - #[test] - fn util2age_good() { - // low utilization -> flush older files - // high utilization -> flush recent files - let args = vec![ - "10,90".to_string(), - "20,80".to_string(), - "30,70".to_string(), - "40,60".to_string(), - "50,50".to_string(), - "60,40".to_string(), - "70,30".to_string(), - "80,20".to_string(), - "90,10".to_string(), - ]; - - let thresholds = parse_user_thresholds(&args, ','); - assert_eq!(thresholds.len(), 11); - - assert_eq!(util2age(&thresholds, 05), 90); - assert_eq!(util2age(&thresholds, 15), 80); - assert_eq!(util2age(&thresholds, 25), 70); - assert_eq!(util2age(&thresholds, 35), 60); - assert_eq!(util2age(&thresholds, 45), 50); - assert_eq!(util2age(&thresholds, 55), 40); - assert_eq!(util2age(&thresholds, 65), 30); - assert_eq!(util2age(&thresholds, 75), 20); - assert_eq!(util2age(&thresholds, 85), 10); - assert_eq!(util2age(&thresholds, 95), 00); - } - - #[test] - fn util2age_empty() { - let thresholds = parse_user_thresholds(&vec![], ','); - assert_eq!(thresholds.len(), 2); - util2age(&thresholds, 0); - } - - #[test] - #[should_panic(expected = "Error: Utilization percentage not found")] - fn util2age_gt_100() { - let thresholds = parse_user_thresholds(&vec![], ','); - assert_eq!(thresholds.len(), 2); - util2age(&thresholds, 200); - } -} diff --git a/flush_ls/Cargo.lock b/obj_scanner/Cargo.lock similarity index 62% rename from flush_ls/Cargo.lock rename to obj_scanner/Cargo.lock index 75967574..a9840c2c 100644 --- a/flush_ls/Cargo.lock +++ b/obj_scanner/Cargo.lock @@ -2,6 +2,27 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.6.17" @@ -57,6 +78,15 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "cc" +version = "1.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -119,6 +149,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.1.1" @@ -126,13 +168,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] -name = "flush_ls" -version = "0.1.0" +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "clap", - "errno", - "libc", - "tempfile", + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown", ] [[package]] @@ -141,6 +191,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -153,18 +209,64 @@ version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "obj_scanner" +version = "0.1.0" +dependencies = [ + "clap", + "errno", + "libc", + "regex", + "rusqlite", + "tempfile", + "threadpool", +] + [[package]] name = "once_cell" version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "proc-macro2" version = "1.0.89" @@ -183,6 +285,49 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustix" version = "0.38.37" @@ -196,6 +341,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "strsim" version = "0.11.1" @@ -226,6 +383,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "unicode-ident" version = "1.0.13" @@ -238,6 +404,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "windows-sys" version = "0.52.0" @@ -319,3 +497,23 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/flush_ls/Cargo.toml b/obj_scanner/Cargo.toml similarity index 69% rename from flush_ls/Cargo.toml rename to obj_scanner/Cargo.toml index fc76f2bd..7ea87761 100644 --- a/flush_ls/Cargo.toml +++ b/obj_scanner/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "flush_ls" +name = "obj_scanner" version = "0.1.0" edition = "2021" @@ -9,4 +9,7 @@ edition = "2021" clap = { version = "4.5.20", features = ["derive"] } errno = "0.3.9" libc = "0.2.161" +regex = "1.11.1" +rusqlite = { version = "0.32.1", features = ["bundled"] } tempfile = "3.13.0" +threadpool = "1.8.1" diff --git a/obj_scanner/example.config b/obj_scanner/example.config new file mode 100644 index 00000000..71faba45 --- /dev/null +++ b/obj_scanner/example.config @@ -0,0 +1,29 @@ +# General Format: +# Label: value +# +# order does not matter +# + +# reftime: +# last reftime will be used +# can be overwridden by commandline argument +reftime: 0 + +# threshold: , +# multiple threshold labels will be combined into a set of unique pairs +# duplicate utilization values will be overwritten by the last duplicate +threshold: 10,90 +threshold: 20,80 +threshold: 30,70 +threshold: 40,60 +threshold: 50,50 +threshold: 60,40 +threshold: 70,30 +threshold: 80,20 +threshold: 90,10 + +# blacklist: +# multiple blacklist labels will be aggregated +# all blacklist regexps will be run in the order listed +# duplicates are not removed +blacklist: ^.*\.inprog$ diff --git a/obj_scanner/src/config.rs b/obj_scanner/src/config.rs new file mode 100644 index 00000000..3bd4a441 --- /dev/null +++ b/obj_scanner/src/config.rs @@ -0,0 +1,507 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + + +use regex::Regex; +use std::collections::BTreeMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; +use std::time::{Duration, SystemTime, SystemTimeError}; + +/** + * This struct represents the contents of a config file. + * + * The config file contains key-value pairs separated by the ':' character. + * Key-value pairs may appear in any order. + * Duplicate key-value pairs may appear. How they are handled depends on the key. + */ +#[derive(Clone)] +pub struct Config { + // Reference Timestamp (Seconds Since Unix Epoch) + reftime: SystemTime, + + // Mapping from system utilization to allowed file age + // using BTreeMap to remove duplicates and sort on keys + thresholds: BTreeMap, + + // Regular expressions for basenames to ignore + blacklist: Vec, +} + +impl Config { + const LABEL_DELIM: char = ':'; + const THRESHOLD_DELIM: char = ','; + + fn new() -> Config { + Self { + reftime: SystemTime::UNIX_EPOCH, + thresholds: BTreeMap::from([ + (0, u64::MAX), // if utilization is at 0%, don't flush anything + (100, 0), // if utilization is at 100%, flush everything + ]), + blacklist: Vec::new(), + } + } + + pub fn from_pathbuf(path: PathBuf) -> Config { + let mut config = Config::new(); + + let file = match File::open(&path) { + Ok(f) => f, + Err(msg) => panic!("Error: Could not open flush config file {}: {}", + path.display(), msg), + }; + + // parse file + // labels can appear in any order + // processing function determines if the new value is overwrites the old value or is aggregated + for line_res in BufReader::new(file).lines() { + let line = match line_res { + Ok(line) => line, + Err(msg) => panic!("Error: Could not read line from {}: {}", + path.display(), msg), + }; + + config.process_line(&line); + } + + config.verify_thresholds(); + + config + } + + fn process_line(&mut self, line: &str) { + if line.len() == 0 { + return; + } + + if line.chars().nth(0) == Some('#') { + return; + } + + match line.split_once(Self::LABEL_DELIM) { + Some((label, value)) => { + match label.trim() { + "reftime" => self.set_reftime_str(value), + "threshold" => self.add_to_threshold(value), + "blacklist" => self.add_to_blacklist(value), + _ => panic!("Error: Unknown config label: {}", label), + }; + + }, + None => panic!("Error: Did not find separator in line: {}", line), + }; + } + + pub fn set_reftime(&mut self, reftime: u64) { + self.reftime = SystemTime::UNIX_EPOCH + Duration::from_secs(reftime); + } + + fn set_reftime_str(&mut self, time_str: &str) { + match time_str.trim().parse::() { + Ok(reftime) => self.set_reftime(reftime), + Err(msg) => panic!("Error: Could not convert {} into a timestamp: {}", + time_str, msg), + }; + } + + /** + * Convert threshold strings from the config file to integers and + * insert them into a map. + * + * Format: + * threshold: , + * + * Utilization is an integer representing utilization percentage. + * An integer is required because rust does not have an Ord trait + * defined for f32 and f64. https://stackoverflow.com/a/69117941/341683 + * + * Age is integer number of seconds since Jan 1, 1970 00:00:00 UTC. + * + * Examples: + * + * threshold: 10,60 + * threshold:20, 1 + */ + fn add_to_threshold(&mut self, pair_str: &str) { + match pair_str.trim().split_once(Self::THRESHOLD_DELIM) { + Some((util_str, age_str)) => { + let util = match util_str.trim().parse::() { + Ok(val) => val, + Err(msg) => panic!("Error: Bad utilization string: '{}': {}", util_str, msg), + }; + + if util > 100 { + panic!("Error: Utilization can be between 0% and 100%. Got '{}'", util); + } + + let age = match age_str.trim().parse::() { + Ok(val) => val, + Err(msg) => panic!("Error: Bad age string: '{}': {}", age_str, msg), + }; + + self.thresholds.insert(util, age); + }, + None => panic!("Error: Bad , string: '{}'", pair_str), + } + } + + pub fn add_to_blacklist(&mut self, regex: &str) { + match Regex::new(regex.trim()) { + Ok(re) => self.blacklist.push(re), + Err(msg) => panic!("Error: Bad regex pattern: {}: {}", regex, msg), + }; + } + + /** + * Check for monotonically decreasing file ages. + * + * Call this function after the entire config file has been + * processed + */ + fn verify_thresholds(&self) { + let mut prev = self.thresholds.first_key_value(); + for (utilization, age) in self.thresholds.iter().skip(1) { + if age >= prev.unwrap().1 { + panic!("Error: File age must be strictly monotonically decreasing. Found {},{} -> {},{}", + prev.unwrap().0, prev.unwrap().1, utilization, age); + } + + prev = Some((&utilization, &age)) + } + } + + // this will error if the input is later than the reftime + // (duration is negative) - propogate the error to let the + // caller handle it + pub fn file_age(&self, timestamp: SystemTime) -> Result { + self.reftime.duration_since(timestamp) + } + + /** + * Select how old files are allowed to be given system + * utilization and thresholds + * + * Example: + * thresholds: + * 10 -> 60 + * 20 -> 1 + * + * If the utilization is less than or equal to 10%, files older than + * 60 seconds should be flushed. If the utilization is greater than + * 10% and less than or equal to 20%, files older than 1 second should + * be flushed. + * + * TODO: Change to BTreeMap::upper_bound once btree_cursors is merged. + * + * @param utilization system utilization + * @return file age limit in seconds + */ + pub fn util2age(&self, utilization: u8) -> u64 { + // return the age associated with the first threshold + // that is greater than or equal to the utilization + for (threshold, age) in self.thresholds.iter() { + if *threshold >= utilization { + return *age; + } + } + + // this line does double duty as a compiler silencer + // and as an invalid utilization value check + panic!("Error: Utilization percentage not found"); + } + + pub fn is_blacklisted(&self, file_name: &str) -> bool { + for regex in &self.blacklist { + if regex.is_match(file_name) { + return true; + } + } + + false + } +} + +#[cfg(test)] +mod tests { + use crate::config::Config; + use std::path::PathBuf; + use std::time::SystemTime; + use tempfile::NamedTempFile; + + #[test] + #[should_panic(expected = "Error: Could not convert a into a timestamp: invalid digit found in string")] + fn set_reftime_str_bad() { + let mut config = Config::new(); + config.set_reftime_str("a"); + } + + #[test] + fn set_reftime() { + let mut config = Config::new(); + config.set_reftime(0); + + assert_eq!(config.reftime, SystemTime::UNIX_EPOCH); + } + + #[test] + fn newer_reftime() { + let mut config = Config::new(); + config.set_reftime(0); + + assert_eq!(config.file_age(SystemTime::now()).is_err(), true); + } + + #[test] + fn user_threshold_good_single() { + let mut config = Config::new(); + config.add_to_threshold("1,1"); + + assert_eq!(config.thresholds.len(), 3); + assert_eq!(config.thresholds.get(&1), Some(&1)); + } + + #[test] + fn user_threshold_good_multiple() { + let mut config = Config::new(); + config.add_to_threshold("1,2"); + config.add_to_threshold("2,1"); + + assert_eq!(config.thresholds.len(), 4); + assert_eq!(config.thresholds.get(&1), Some(&2)); + assert_eq!(config.thresholds.get(&2), Some(&1)); + } + + #[test] + fn user_threshold_good_repeat() { + let mut config = Config::new(); + config.add_to_threshold("0,1"); + + assert_eq!(config.thresholds.len(), 2); + assert_eq!(config.thresholds.get(&0), Some(&1)); + } + + #[test] + #[should_panic(expected="Error: Bad , string: ''")] + fn user_threshold_empty() { + let mut config = Config::new(); + config.add_to_threshold(""); + } + + #[test] + #[should_panic(expected="Error: Bad age string: '': cannot parse integer from empty string")] + fn user_threshold_digit_empty() { + let mut config = Config::new(); + config.add_to_threshold("1,"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] + fn user_threshold_empty_digit() { + let mut config = Config::new(); + config.add_to_threshold(",1"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: 'a': invalid digit found in string")] + fn user_threshold_alpha_empty() { + let mut config = Config::new(); + config.add_to_threshold("a,"); + } + + #[test] + #[should_panic(expected="Error: Bad utilization string: '': cannot parse integer from empty string")] + fn user_threshold_empty_alpha() { + let mut config = Config::new(); + config.add_to_threshold(",a"); + } + + #[test] + #[should_panic(expected="Error: Utilization can be between 0% and 100%. Got '200'")] + fn user_threshold_too_big() { + let mut config = Config::new(); + config.add_to_threshold("200,"); + } + + #[test] + #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,1")] + fn user_threshold_same_ages() { + let mut config = Config::new(); + config.add_to_threshold("10,1"); + config.add_to_threshold("20,1"); + config.verify_thresholds(); + } + + #[test] + #[should_panic(expected="Error: File age must be strictly monotonically decreasing. Found 10,1 -> 20,2")] + fn user_threshold_increasing_ages() { + let mut config = Config::new(); + config.add_to_threshold("10,1"); + config.add_to_threshold("20,2"); + config.verify_thresholds(); + } + + #[test] + #[should_panic(expected = "Error: Utilization percentage not found")] + fn util2age_gt_100() { + let config = Config::new(); + let _ = config.util2age(200); + } + + #[test] + #[should_panic(expected = "Error: Bad regex pattern: \\: regex parse error:")] + fn bad_regex() { + let mut config = Config::new(); + config.add_to_blacklist("\\"); + } + + #[test] + fn no_blacklist() { + let config = Config::new(); + + assert_eq!(config.is_blacklisted(""), false); + assert_eq!(config.is_blacklisted("test"), false); + assert_eq!(config.is_blacklisted("match"), false); + assert_eq!(config.is_blacklisted("matching"), false); + } + + #[test] + fn blacklist() { + let mut config = Config::new(); + config.add_to_blacklist("^match.+$"); + + assert_eq!(config.is_blacklisted(""), false); + assert_eq!(config.is_blacklisted("test"), false); + assert_eq!(config.is_blacklisted("match"), false); + assert_eq!(config.is_blacklisted("matching"), true); + } + + #[test] + #[should_panic(expected = "Error: Did not find separator in line: bad label")] + fn missing_separator() { + let mut config = Config::new(); + config.process_line("bad label"); + } + + #[test] + #[should_panic(expected = "Error: Unknown config label: bad label")] + fn bad_label() { + let mut config = Config::new(); + config.process_line("bad label: "); + } + + #[test] + #[should_panic(expected = "Error: Could not open flush config file")] + fn nonexistant_config() { + let file = NamedTempFile::new().unwrap(); + let path = PathBuf::from(file.path()); + let _ = file.close(); + + let _ = Config::from_pathbuf(path); + } + + #[test] + fn empty_config() { + let config = Config::new(); + + assert_eq!(config.thresholds.len(), 2); + assert_eq!(config.blacklist.len(), 0); + } + + #[test] + fn example_config() { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("example.config"); + + let config = Config::from_pathbuf(path); + + assert_eq!(config.reftime, SystemTime::UNIX_EPOCH); + + assert_eq!(config.thresholds.len(), 11); + + // get raw values + assert_eq!(config.thresholds.get(&000), Some(&u64::MAX)); + assert_eq!(config.thresholds.get(&010), Some(&90)); + assert_eq!(config.thresholds.get(&020), Some(&80)); + assert_eq!(config.thresholds.get(&030), Some(&70)); + assert_eq!(config.thresholds.get(&040), Some(&60)); + assert_eq!(config.thresholds.get(&050), Some(&50)); + assert_eq!(config.thresholds.get(&060), Some(&40)); + assert_eq!(config.thresholds.get(&070), Some(&30)); + assert_eq!(config.thresholds.get(&080), Some(&20)); + assert_eq!(config.thresholds.get(&090), Some(&10)); + assert_eq!(config.thresholds.get(&100), Some(&00)); + + // get file age given utilization + assert_eq!(config.util2age(00), u64::MAX); + assert_eq!(config.util2age(05), 90); + assert_eq!(config.util2age(15), 80); + assert_eq!(config.util2age(25), 70); + assert_eq!(config.util2age(35), 60); + assert_eq!(config.util2age(45), 50); + assert_eq!(config.util2age(55), 40); + assert_eq!(config.util2age(65), 30); + assert_eq!(config.util2age(75), 20); + assert_eq!(config.util2age(85), 10); + assert_eq!(config.util2age(95), 00); + + assert_eq!(config.blacklist.len(), 1); + } +} diff --git a/obj_scanner/src/main.rs b/obj_scanner/src/main.rs new file mode 100644 index 00000000..0e45f99f --- /dev/null +++ b/obj_scanner/src/main.rs @@ -0,0 +1,591 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + +use clap::{Args, Parser}; +use errno; +use regex::Regex; +use rusqlite::{Connection, OpenFlags}; +use std::cmp::min; +use std::fs; +use std::io::Write; +use std::path::PathBuf; +use std::sync::{Arc, mpsc}; +use std::time::{Duration, SystemTime}; +use threadpool::ThreadPool; + +mod config; + +#[derive(Debug, Args, Clone)] +#[group(required=true, multiple=true)] +struct Ops { + #[arg(long, help="Print leaf files to flush")] + flush: bool, + + #[arg(long, help="Print leaf files to push")] + push: bool, +} + +#[derive(Clone)] +struct FlushPush { + config: config::Config, + ops: Ops, + must_match: Option, + force: bool, +} + +// list of paths to write to output files +#[derive(Clone)] +struct Output { + push: mpsc::Sender, + flush: mpsc::Sender, +} + +// path components to leaves of marfs data tree +static PATH_SEGS: &[&str] = &[ + "pod", + "block", + "cap", + "scat" +]; + +// ////////////////////////////////////// +// Constants for database containing files that have been pushed +// +// If a file is selected for pushing, log it here as though it has +// been pushed so that it will not show up again next time this is run +// +// If a file is selected for flushing, it will be removed from the +// database +static PUSHDB_NAME: &str = "push.db"; + +// pattern for blacklisting this file +static PUSHDB_REGEX: &str = "^push\\.db$"; + +/** + * Open and/or create the PUSH database + * Tables are set up if they don't exist + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @return a handle to the PUSH database + */ +fn open_pushdb(path: &PathBuf) -> Result { + let mut dbname = path.clone(); + dbname.push(PUSHDB_NAME); + + match Connection::open_with_flags(&dbname, + OpenFlags::SQLITE_OPEN_CREATE | + OpenFlags::SQLITE_OPEN_READ_WRITE) { + Ok(conn) => { + // these can't really fail + let _ = conn.execute_batch("CREATE TABLE IF NOT EXISTS push (name TEXT PRIMARY KEY, mtime int64);"); + let _ = conn.execute_batch("CREATE TABLE IF NOT EXISTS mtime (oldest int64);"); + Ok(conn) + }, + Err(msg) => Err(format!("Could not open PUSH database {}: {}", + dbname.display(), msg)), + } +} +// ////////////////////////////////////// + +/** + * Get the filesystem utilization of the provided path + * using statvfs, rounded down to the nearest integer. + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @return utilization + */ +fn get_utilization(path: &PathBuf) -> Result { + // get the leaf's utilization + unsafe { + use libc; + use std::ffi::CString; + use std::mem; + + let path_cstr = CString::new(path.display().to_string()).unwrap(); + let mut vfs_st: libc::statvfs = mem::zeroed(); + + if libc::statvfs(path_cstr.as_ptr(), &mut vfs_st as *mut libc::statvfs) < 0 { + return Err(errno::errno()); + } + + Ok((100 - vfs_st.f_bfree * 100 / vfs_st.f_blocks) as u8) + } +} + +/** + * Process files under /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * + * if this file's basename is allowed + * if FLUSH + * if (reftime - file.mtime) > age + * print the file's path and delete it from the PUSH db + * + * if file was not flushed, record min(mtime) + * + * if PUSH + * insert the path into the PUSH db and print the file's path + * + * @param path /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+/ + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + */ +fn process_leaf(path: PathBuf, fp: Arc, output: Output) { + // open/create the PUSH database + let pushdb = match open_pushdb(&path) { + Ok(conn) => conn, + Err(msg) => { eprintln!("Error: {}", msg); return; }, + }; + + // get previously existing oldest mtime, if one exists + // + // if a previous mtime was not found, default to UNIX_EPOCH: new + // mtimes must come after UNIX_EPOCH, so every file will pass the + // mtime check + let prev_oldest_mtime = SystemTime::UNIX_EPOCH + + match pushdb.query_row::("SELECT oldest FROM mtime;", [], |row| row.get(0)) { + Ok(val) => Duration::from_secs(val), + Err(_) => Duration::from_secs(0), + }; + + // get the leaf's utilization + let util = match get_utilization(&path) { + Ok(val) => val, + Err(msg) => { eprintln!("Warning: Getting utilization for {} failed: {}", path.display(), msg); return; }, + }; + + // figure out the file age limit + // get here to not do repeated searches + let max_age = fp.config.util2age(util); + + // use a time that all files should have been created before + // assumes all system clocks are not off by more than a year + let future_mtime = SystemTime::now() + Duration::from_secs(60 * 60 * 24 * 365); + let mut curr_oldest_mtime = future_mtime.clone(); + + let entries = match fs::read_dir(&path) { + Ok(list) => list, + Err(msg) => { eprintln!("Error: Could not read_dir {}: {}", path.display(), msg); return; }, + }; + + // loop through leaf directory and find files older than the limit + for entry_res in entries { + let entry = match entry_res { + Ok(entry) => entry, + Err(msg) => { eprintln!("Warning: Could not get entry: {}", msg); continue; }, + }; + + let child = entry.path(); + + if let Ok(entry_type) = entry.file_type() { + // only expecting files under leaf directories + if !entry_type.is_file() { + eprintln!("Warning: {} is not a file. Skipping.", child.display()); + continue; + } + } else { + eprintln!("Warning: Could not get type of {}. Skipping.", child.display()); + continue; + } + + if let Ok(st) = child.metadata() { + if let Ok(mtime) = st.modified() { + if let Ok(file_age) = fp.config.file_age(mtime) { + let basename = child.file_name().unwrap().to_str().unwrap(); + + // basename must match in order to be processed + if let Some(whitelist) = &fp.must_match { + if !whitelist.is_match(&basename) { + continue; + } + } + + // do not process blacklisted files + if fp.config.is_blacklisted(&basename) { + continue; + } + + // If the FLUSH flag was specified, target object + // components will be selected if their mtime + // value is older than the value corresponding to + // the current FS fullness percentage, as + // specified in the config file. + // + // Do actual FLUSH later in order to track mtime + // + // Can mtime < prev_oldest_mtime happen? + let flush = fp.ops.flush && + (fp.force || ((mtime >= prev_oldest_mtime) && (file_age.as_secs() > max_age))); + + // keep track of the oldest mtime value amongst + // all encountered files, excluding those for + // which a FLUSH op is targeted, regardless of + // whether they match any white/blacklist object + // regex. + if !flush && (mtime < curr_oldest_mtime) { + curr_oldest_mtime = mtime; + } + + // process FLUSH here + if flush { + let _ = output.flush.send(child.clone()); + let _ = pushdb.execute(&format!("DELETE FROM push WHERE name == '{}';", basename), []); + continue; // flushed files are done being processed + } + + if !fp.ops.push { + continue; + } + + let msecs = mtime.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + // if the path is not already in the PUSH db, queue it for printing + if let Ok(_) = pushdb.execute(&format!("INSERT INTO push (name, mtime) VALUES ('{}', {});", + basename, msecs), []) { + let _ = output.push.send(child.clone()); + continue; // done + } + + // file is already listed - check the timestamp + match pushdb.query_row::(&format!("SELECT mtime FROM push WHERE name == '{}';", + basename), [], |row| row.get(0)) { + Ok(old_mtime) => { + let old_mtime = SystemTime::UNIX_EPOCH + Duration::from_secs(old_mtime); + if old_mtime < mtime { + // attempt to update PUSH db + if let Err(msg) = pushdb.execute(&format!("UPDATE push SET mtime = {} WHERE name == '{}';", + msecs, basename), []) { + eprintln!("Error: Could not update mtime of previously pushed file {}: {}", + child.display(), msg); + } + + // add to PUSH file no matter what + let _ = output.push.send(child.clone()); + } else { + // add to PUSH file anyways + if fp.force { + let _ = output.push.send(child.clone()); + } + } + }, + Err(msg) => { + eprintln!("Warning: Could not get existing mtime for {}, so pushing again: {}", + child.display(), msg); + let _ = output.push.send(child.clone()); + }, + }; + } + } + } + } + + // if the oldest mtime was updated (this directory has at least 1 file), write it to PUSH db + if (curr_oldest_mtime > prev_oldest_mtime) && + (curr_oldest_mtime != future_mtime) { + let msecs = curr_oldest_mtime.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + // remove all rows just in case there's more than 1 row + let _ = pushdb.execute_batch("DELETE FROM mtime;"); + + // insert the new oldest mtime + let _ = pushdb.execute(&format!("INSERT INTO mtime (oldest) VALUES ({});", msecs), []); + } +} + +/** + * Find directories matching /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ + * + * @param path and lower + * @param level which path segment is currently being processed + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + * @param threadpool the threadpool that is processing child work + */ +fn process_non_leaf(path: PathBuf, level: usize, + fp: Arc, + output: Output, + threadpool: ThreadPool) { + if level == 4 { + // panic on > 4? + threadpool.execute(move || { + process_leaf(path, fp, output); + }); + return; + } + + let entries = match fs::read_dir(&path) { + Ok(list) => list, + Err(msg) => { + eprintln!("Warning: Could not read_dir {}: {}", path.display(), msg); + return; + }, + }; + + let expected = PATH_SEGS[level]; + let len = expected.len(); + + // find paths that match current marfs path segment + for entry_res in entries { + let entry = match entry_res { + Ok(entry) => entry, + Err(msg) => { + eprintln!("Warning: Could not get entry: {}", msg); + continue; + }, + }; + + let child = entry.path(); + + if child.is_dir() == false { + continue; + } + + // make sure current basename has expected path segment + if let Some(basename) = child.file_name() { + if len < basename.len() { + if let Some(basename_str) = basename.to_str() { + if &basename_str[0..len] != expected { + continue; + } + + if let Err(_) = &basename_str[len..].parse::() { + continue; + } + + let fp_clone = fp.clone(); + let threadpool_clone = threadpool.clone(); + let output_clone = output.clone(); + + threadpool.execute(move ||{ + process_non_leaf(child, level + 1, + fp_clone, + output_clone, + threadpool_clone); + }); + } + } + } + } +} + +/** + * Recurse down to /pod[0-9]+/block[0-9]+/cap[0-9]+/scat[0-9]+ + * and find files that are older than the provided age + * + * @param dal_root + * @param fp flush/push config + * @param output send flush and push tx channel endpoints + * @param nthreads the number of the threads that should be used to process this DAL root + */ +fn print_flushable_in_dal(dal_root: &PathBuf, + fp: FlushPush, + output: Output, + nthreads: usize) { + let path = dal_root.clone(); + let fp_arc = Arc::new(fp); + + // no need to clone output here + + let pool = ThreadPool::new(nthreads); + let pool_clone = pool.clone(); + + pool.execute(move || { + process_non_leaf(path, 0, fp_arc, output, pool_clone); + }); + + // Can't parallelize writes during processing: + // - need to drop original tx, but if write threads interrupt + // walk threads, they might complete before walk threads + // + // - write threads would sit on threads for the entire lifetime + // of the threadpool, blocking any work queued behind them, + // preventing completion + // - this threadpool doesn't steal work + // + // - spawning (nthreads - 2) walk threads and then spawning 2 + // more threads for writing does not seem correct + // + // - spawning nthreads walk threads and then spawning 2 more + // threads for writing does not seem correct either + // + // - not writing in walk threads in order to use locking of + // mpsc + // + // - threading rx clones/references at the end of each leaf to + // write paths in parallel doesn't make sense + // - just pass around file handles and locks + // + + pool.join(); +} + +/** + * Iterate through rx channel endpoints and write the paths to the + * appropriate files. + * + * @param output_dir directory to place files at + * @param flush the rx channel endpoint containing flush paths + * @param push the rx channel endpoint containing push paths + * @param nthreads number of threads allowed to use to write output + */ +fn write_outputs(output_dir: PathBuf, ops: Ops, + flush: mpsc::Receiver, + push: mpsc::Receiver, + nthreads: usize) { + fn write_paths(pool: &ThreadPool, + output_dir: PathBuf, + name: &str, + paths: mpsc::Receiver) { + let name_str = String::from(name); + pool.execute(move || { + let mut output_path = output_dir; + output_path.push(name_str); + let mut output_file = match fs::File::create(output_path.clone()) { + Ok(file) => file, + Err(msg) => panic!("Error: Could not open flush file {}: {}", output_path.display(), msg), + }; + + for path in paths { + let _ = output_file.write_all((path.display().to_string() + "\n").as_bytes()); + } + }); + } + + let nfiles = if ops.flush { 1 } else { 0 } + if ops.push { 1 } else { 0 }; + let pool = ThreadPool::new(min(nthreads, nfiles)); + if ops.flush { + write_paths(&pool, output_dir.clone(), "flush", flush); + } + if ops.push { + write_paths(&pool, output_dir.clone(), "push", push); + } + pool.join(); +} + +#[derive(Parser, Debug)] +#[command()] +struct Cli { + #[arg(help="DAL root path")] + dal_root: PathBuf, + + #[arg(help="Path to config file")] + config_file: PathBuf, + + #[arg(help="Output Directory")] + output_dir: PathBuf, + + #[clap(flatten)] + ops: Ops, + + #[arg(short, long, value_name="regexp")] + #[arg(help="Regex pattern which objects must match in order to be eligible for any operation")] + must_match: Option, + + #[arg(short, long, default_value="1", help="Thread Count")] + nthreads: usize, + + #[arg(short, long, help="Force operation even if condition is not met")] + force: bool, + + #[arg(short, long, help="Overwrite config file reftime")] + reftime: Option, + + #[arg(short, long, help="Input path is a leaf dir")] + leaf: bool +} + +fn main() { + let cli = Cli::parse(); + + let mut fp = { FlushPush { + config: config::Config::from_pathbuf(cli.config_file.clone()), + ops: cli.ops.clone(), + must_match: cli.must_match, + force: cli.force, + } }; + + // never process PUSH db + fp.config.add_to_blacklist(PUSHDB_REGEX); + + // overwrite reftime found in config file with command line argument + if let Some(reftime) = cli.reftime { + fp.config.set_reftime(reftime); + } + + // output channels for each type of operation + let (flush_tx, flush_rx) = mpsc::channel(); + let (push_tx, push_rx) = mpsc::channel(); + + // threads write to tx + let output = Output { + flush: flush_tx, + push: push_tx, + }; + + if cli.leaf { + process_leaf(cli.dal_root, Arc::new(fp), output); + } else { + print_flushable_in_dal(&cli.dal_root, fp, output, cli.nthreads); + } + + // read from rx + write_outputs(cli.output_dir, cli.ops, flush_rx, push_rx, cli.nthreads); +} + +#[cfg(test)] +mod tests; diff --git a/obj_scanner/src/tests.rs b/obj_scanner/src/tests.rs new file mode 100644 index 00000000..3af84d15 --- /dev/null +++ b/obj_scanner/src/tests.rs @@ -0,0 +1,295 @@ +/* +Copyright (c) 2015, Los Alamos National Security, LLC +All rights reserved. + +Copyright 2015. Los Alamos National Security, LLC. This software was +produced under U.S. Government contract DE-AC52-06NA25396 for Los +Alamos National Laboratory (LANL), which is operated by Los Alamos +National Security, LLC for the U.S. Department of Energy. The +U.S. Government has rights to use, reproduce, and distribute this +software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY, +LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LIABILITY +FOR THE USE OF THIS SOFTWARE. If software is modified to produce +derivative works, such modified software should be clearly marked, so +as not to confuse it with the version available from LANL. + +Additionally, redistribution and use in source and binary forms, with +or without modification, are permitted provided that the following +conditions are met: 1. Redistributions of source code must retain the +above copyright notice, this list of conditions and the following +disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. Neither the name of Los Alamos National Security, LLC, Los Alamos +National Laboratory, LANL, the U.S. Government, nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL LOS +ALAMOS NATIONAL SECURITY, LLC OR CONTRIBUTORS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +----- +NOTE: +----- +MarFS is released under the BSD license. + +MarFS was reviewed and released by LANL under Los Alamos Computer Code +identifier: LA-CC-15-039. + +MarFS uses libaws4c for Amazon S3 object communication. The original +version is at https://aws.amazon.com/code/Amazon-S3/2601 and under the +LGPL license. LANL added functionality to the original work. The +original work plus LANL contributions is found at +https://github.com/jti-lanl/aws4c. + +GNU licenses can be found at http://www.gnu.org/licenses/. +*/ + +use crate::*; +use std::mem; +use tempfile::{TempDir, tempdir}; + +fn setup_dal() -> (TempDir, PathBuf) { + // DAL root + let root = tempdir().unwrap(); + + // create intermediate directories + let mut leaf = PathBuf::from(root.path().to_path_buf().to_owned()); + + for path_seg in PATH_SEGS { + leaf.push(String::from(*path_seg) + "0"); + let _ = fs::create_dir(&leaf); + } + + // return root to prevent destructor call + (root, leaf) +} + +fn setup_file(path: &PathBuf, name: &str, mtime: u64) { + // create pod/block/cap/scat/* + let mut filename = path.clone(); + filename = filename.join(name); + + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(mtime); + let utime = fs::FileTimes::new().set_modified(timestamp); + + let file = fs::File::create(&filename).unwrap(); + let _ = file.set_times(utime); +} + +fn setup_config(flush: bool, push: bool, force: bool) -> FlushPush { + let mut config_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + config_path.push("example.config"); + + let mut fp = FlushPush { + config: config::Config::from_pathbuf(config_path), + ops: Ops { + flush: flush, + push: push, + }, + must_match: Some(Regex::new(".*").unwrap()), + force: force, + }; + + fp.config.set_reftime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()); + fp.config.add_to_blacklist(PUSHDB_REGEX); + + fp +} + +fn setup_channels() -> (Output, (mpsc::Receiver, mpsc::Receiver)) { + let (flush_tx, flush_rx) = mpsc::channel::(); + let (push_tx, push_rx) = mpsc::channel::(); + + (Output { + flush: flush_tx, + push: push_tx, + }, (flush_rx, push_rx)) +} + +fn get_u64(path: &PathBuf, sql: &str) -> Result { + let mut dbname = path.clone(); + dbname.push(PUSHDB_NAME); + + let conn = Connection::open_with_flags(&dbname, + OpenFlags::SQLITE_OPEN_CREATE | + OpenFlags::SQLITE_OPEN_READ_WRITE)?; + + conn.query_row::(sql, [], |row| row.get(0)) +} + +#[test] +fn process_leaf_correctness() { + let (_root, leaf) = setup_dal(); + let (tx, _rx) = setup_channels(); + + // push on empty leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // empty leaf + { + let mut count = 0; + + for _ in fs::read_dir(&leaf).unwrap() { + count += 1; + } + + assert_eq!(count, 0); + } + + // getting prev mtime fails because this is an empty leaf + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + assert!(!get_u64(&leaf, "SELECT oldest FROM mtime;").is_ok(), "Should not have gotten an mtime"); + + // PUSHDB has been created + { + let mut count = 0; + + for _ in fs::read_dir(&leaf).unwrap() { + count += 1; + } + + assert_eq!(count, 1); + } + } + + // add a file to establish an mtime + static FILENAME: &str = "old_file"; + static FIRST_MTIME: u64 = 1000; + setup_file(&leaf, FILENAME, FIRST_MTIME); + + // push leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // getting prev mtime fails (again) because the previous run did not set it + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // mtime will be set this time + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), FIRST_MTIME); + + // file will be pushed because it was not selected for flush + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + + // getting prev mtime should succeed + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // force push file that has already been pushed + let fp = Arc::new(setup_config(false, true, true)); + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + } + + // ////////////////////////////////////////////////// + + // update mtime or else process_leaf() will + // think this file has already been flushed + static SECOND_MTIME: u64 = 1000000; + setup_file(&leaf, FILENAME, SECOND_MTIME); + + // push leaf + { + let fp = Arc::new(setup_config(false, true, false)); + + // will update row in PUSH db + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // prev oldest mtime changed + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), SECOND_MTIME); + + // still only 1 file in table + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 1); + } + + // flush leaf + { + let fp = Arc::new(setup_config(true, false, false)); + + // will delete row from PUSH db + process_leaf(leaf.clone(), fp.clone(), tx.clone()); + + // prev oldest mtime not changed + assert_eq!(get_u64(&leaf, "SELECT oldest FROM mtime;").unwrap(), SECOND_MTIME); + + // file is no longer scheduled for flushing (because it has been flushed) + assert_eq!(get_u64(&leaf, "SELECT COUNT(*) FROM push;").unwrap(), 0); + } +} + +#[test] +fn process_non_leaf_empty() { + let (root, leaf) = setup_dal(); + let fp = setup_config(false, false, false); + let (tx, _rx) = setup_channels(); + + let dal_root = PathBuf::from(root.path().to_path_buf().to_owned()); + print_flushable_in_dal(&dal_root, fp.clone(), tx.clone(), 1); + + let mut bad_root = leaf.clone(); + bad_root.push("non-existant"); + print_flushable_in_dal(&bad_root, fp.clone(), tx.clone(), 1); +} + +#[test] +fn readonly_pushdb() { + let dir = tempdir().unwrap(); + let mut path = PathBuf::from(dir.path().to_path_buf().to_owned()); + path.push("ro"); + + let _ = fs::OpenOptions::new().read(true).write(false).create(true).open(&path); + + assert!(!open_pushdb(&path).is_ok(), "open_pushdb should not have succeeded"); +} + +#[test] +fn write_output_files() { + let dir = tempdir().unwrap(); + let path = PathBuf::from(dir.path().to_path_buf().to_owned()); + + static FLUSH: &str = "flush"; + static PUSH: &str = "push"; + + let ops = Ops { + flush: true, + push: true, + }; + + let flush_contents = FLUSH.to_string(); + let (flush_tx, flush_rx) = mpsc::channel::(); + let _ = flush_tx.send(PathBuf::from(&flush_contents)); + mem::drop(flush_tx); + + let push_contents = PUSH.to_string(); + let (push_tx, push_rx) = mpsc::channel::(); + let _ = push_tx.send(PathBuf::from(&push_contents)); + mem::drop(push_tx); + + write_outputs(path.clone(), ops, flush_rx, push_rx, 1); + + { + let mut filename = path.clone(); + filename.push(FLUSH); + let contents = fs::read_to_string(filename).unwrap(); + assert_eq!(contents, flush_contents + "\n"); + } + + { + let mut filename = path.clone(); + filename.push(PUSH); + let contents = fs::read_to_string(filename).unwrap(); + assert_eq!(contents, push_contents + "\n"); + } +}