From 3c68b84a392715a8ecfc210ce0728e1648cbd5f2 Mon Sep 17 00:00:00 2001 From: Benjamin Nguyen Date: Tue, 7 Nov 2023 10:28:27 -0800 Subject: [PATCH] parallel travesal algorithm --- Cargo.lock | 54 ++++++++++++--- Cargo.toml | 1 + src/disk/mod.rs | 50 +++++++++----- src/file/inode.rs | 2 +- src/file/mod.rs | 14 ++-- src/main.rs | 10 +-- src/tree/mod.rs | 160 +++++++++++++++++++++++++++++++++++-------- src/tree/parallel.rs | 104 ++++++++++++++++++++++++++++ src/tree/walker.rs | 39 +++++++++++ src/user/mod.rs | 11 ++- 10 files changed, 371 insertions(+), 74 deletions(-) create mode 100644 src/tree/parallel.rs create mode 100644 src/tree/walker.rs diff --git a/Cargo.lock b/Cargo.lock index 22688c9..3361571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,19 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ahash" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -49,7 +62,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.38", ] [[package]] @@ -229,6 +242,7 @@ dependencies = [ name = "erdtree" version = "3.1.2" dependencies = [ + "ahash", "ansi_term", "anyhow", "chrono", @@ -638,18 +652,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.52" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d0e1ae9e836cc3beddd63db0df682593d7e2d3d891ae8c9083d2113e1744224" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -810,9 +824,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.12" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -868,7 +882,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.38", ] [[package]] @@ -966,7 +980,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -988,7 +1002,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1185,3 +1199,23 @@ name = "windows_x86_64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "zerocopy" +version = "0.7.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd369a67c0edfef15010f980c3cbe45d7f651deac2cd67ce097cd801de16557" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2f140bda219a26ccc0cdb03dba58af72590c53b22642577d88a927bc5c87d6b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] diff --git a/Cargo.toml b/Cargo.toml index d5b903c..bddb952 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ name = "erd" path = "src/main.rs" [dependencies] +ahash = "0.8.6" ansi_term = "0.12.1" anyhow = "1.0.75" chrono = { version = "0.4.24", default-features = false, features = ["clock", "std"] } diff --git a/src/disk/mod.rs b/src/disk/mod.rs index 84b4974..de5f612 100644 --- a/src/disk/mod.rs +++ b/src/disk/mod.rs @@ -1,8 +1,9 @@ -use crate::{error::prelude::*, user::enums::BytePresentation}; +use crate::user::enums::BytePresentation; use ignore::DirEntry; use std::{ fmt::{self, Display}, - fs::Metadata, + fs::{self, Metadata}, + io, ops::AddAssign, }; @@ -92,35 +93,52 @@ impl Usage { /// Gets the word count. Words are delimited by a whitespace or a sequence of whitespaces. /// Directories are initialized to 0. The `follow` argument determines whether or not to query the /// symlink target, otherwise the symlink will have a word count of 0. - pub fn init_word_count(data: &DirEntry, metadata: &Metadata, follow: bool) -> Result { + pub fn init_word_count( + data: &DirEntry, + metadata: &Metadata, + follow: bool, + ) -> Result { if metadata.is_dir() || (metadata.is_symlink() && !follow) { return Ok(Self::WordCount(0)); } - let word_count = std::fs::read_to_string(data.path()) - .into_report(ErrorCategory::Internal) - .map(|data| data.split_whitespace().count())?; + let word_count = + std::fs::read_to_string(data.path()).map(|data| data.split_whitespace().count())?; - u64::try_from(word_count) - .into_report(ErrorCategory::Internal) - .map(Self::WordCount) + let word_count = u64::try_from(word_count).map_or_else( + |e| { + log::warn!("Usage::init_word_count {e}"); + Self::WordCount(word_count as u64) + }, + Self::WordCount, + ); + + Ok(word_count) } /// Gets the line count. Lines are delimited by the new-line ASCII char. Directories are /// initialized to 0. The `follow` argument determines whether or not to query the symlink /// target, otherwise the symlink will have a count of 0. - pub fn init_line_count(data: &DirEntry, metadata: &Metadata, follow: bool) -> Result { + pub fn init_line_count( + data: &DirEntry, + metadata: &Metadata, + follow: bool, + ) -> Result { if metadata.is_dir() || (metadata.is_symlink() && !follow) { return Ok(Self::LineCount(0)); } - let line_count = std::fs::read_to_string(data.path()) - .into_report(ErrorCategory::Internal) - .map(|data| data.lines().count())?; + let line_count = fs::read_to_string(data.path()).map(|data| data.lines().count())?; + + let line_count = u64::try_from(line_count).map_or_else( + |e| { + log::warn!("Usage::init_line_count {e}"); + Self::WordCount(line_count as u64) + }, + Self::LineCount, + ); - u64::try_from(line_count) - .into_report(ErrorCategory::Internal) - .map(Self::WordCount) + Ok(line_count) } /// Gets the underlying numeric value representing the disk usage diff --git a/src/file/inode.rs b/src/file/inode.rs index bb7078f..f0b25b9 100644 --- a/src/file/inode.rs +++ b/src/file/inode.rs @@ -16,7 +16,7 @@ impl Inode { } #[derive(Debug, thiserror::Error)] -#[error("Insufficient information to compute inode")] +#[error("Insufficient information to compute inode.")] pub struct INodeError; impl TryFrom<&Metadata> for Inode { diff --git a/src/file/mod.rs b/src/file/mod.rs index d4261a7..116ac45 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -1,17 +1,17 @@ use crate::{ disk, - error::prelude::*, user::{enums::Metric, Context}, }; use ignore::DirEntry; use std::{ fs::{self, Metadata}, + io, ops::Deref, }; /// Concerned with querying information about a file's underlying inode. pub mod inode; -use inode::Inode; +use inode::{INodeError, Inode}; /// Erdtree's wrapper around [`DirEntry`], it's metadata ([`Metadata`]). Also contains disk usage /// information of files. Directories will always be initialized to have a size of zero as they @@ -42,13 +42,13 @@ impl File { follow, .. }: &Context, - ) -> Result { + ) -> Result { let path = data.path(); let metadata = if *follow { - fs::metadata(path).into_report(ErrorCategory::System)? + fs::metadata(path)? } else { - fs::symlink_metadata(path).into_report(ErrorCategory::System)? + fs::symlink_metadata(path)? }; let size = match metric { @@ -65,8 +65,8 @@ impl File { } /// Attempts to query the [`File`]'s underlying inode which is represented by [`Inode`]. - pub fn inode(&self) -> Result { - Inode::try_from(&self.metadata).into_report(ErrorCategory::Internal) + pub fn inode(&self) -> Result { + Inode::try_from(&self.metadata) } /// Gets a mutable reference to the `size` field. diff --git a/src/main.rs b/src/main.rs index c7e8c51..40924a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,18 +44,12 @@ fn run() -> error::Result<()> { .transpose()?; let file_tree = if ctx.threads > 1 { - FileTree::init(&ctx)? + FileTree::init_parallel(&ctx)? } else { FileTree::init(&ctx)? }; - let Some(indextree::NodeEdge::Start(id)) = file_tree.traverse().next() else { - panic!("womp"); - }; - - let root = file_tree[id].get(); - - println!("{root:?}"); + //println!("{}", file_tree[file_tree.root_id()].get().size().value()); if let Some(logger) = logger { logger.flush(); diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 804d84a..4b3ff80 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -1,7 +1,19 @@ use crate::{error::prelude::*, file::File, user::Context}; -use ignore::{Walk, WalkBuilder}; +use ahash::{HashMap, HashSet}; +use ignore::Walk; use indextree::{Arena, NodeId, Traverse}; -use std::{collections::HashMap, convert::TryFrom, ops::Deref}; +use std::{ + convert::TryFrom, + fs, + ops::Deref, + path::PathBuf, +}; + +/// Utilities for parallel traversal and associated event publishing/consuming. +mod parallel; + +/// Concerned with initializing [`Walk`] and [`WalkParallel`] from the user [`Context`]. +mod walker; /// Representation of the file-tree that is traversed starting from the root directory whose index /// in the underlying `arena` is `root_id`. @@ -10,6 +22,7 @@ pub struct FileTree { arena: Arena, } +/// Errors associated with [`FileTree`]. #[derive(Debug, thiserror::Error)] pub enum TreeError { #[error("Failed to query the root directory")] @@ -27,6 +40,10 @@ impl FileTree { Self { root_id, arena } } + pub fn root_id(&self) -> NodeId { + self.root_id + } + /// Initializes a [`FileTree`] completely on one thread. pub fn init(ctx: &Context) -> Result { let mut walker = Walk::try_from(ctx)?; @@ -38,21 +55,25 @@ impl FileTree { let root_node = root_entry .into_report(ErrorCategory::Internal) - .and_then(|data| File::init(data, ctx))?; + .and_then(|data| File::init(data, ctx).into_report(ErrorCategory::Internal))?; let mut arena = Arena::new(); let root_node_id = arena.new_node(root_node); let mut current_dir_id = root_node_id; - let mut dirsize_map = HashMap::new(); + let mut dirsize_map = HashMap::default(); let mut dir_stack = vec![]; dirsize_map.insert(root_node_id, 0); + // To prevent two or more files with the same underlying inode from + // counted more than once which would lead to inaccurate disk usage. + let mut inode_set = HashSet::default(); + for dent in walker { let node = match dent .into_report(ErrorCategory::Warning) - .and_then(|data| File::init(data, ctx)) + .and_then(|data| File::init(data, ctx).into_report(ErrorCategory::Warning)) { Ok(data) => data, Err(e) => { @@ -61,8 +82,19 @@ impl FileTree { }, }; + let size = match node.inode() { + Ok(inode) => inode_set + .insert(inode) + .then_some(node.size().value()) + .unwrap_or(0), + Err(e) => { + log::error!("{e}"); + node.size().value() + }, + }; + + // Check if new node is directory before we transfer ownership to `arena`. let is_dir = node.file_type().is_some_and(|ft| ft.is_dir()); - let size = node.size().value(); let new_node_id = arena.new_node(node); @@ -105,11 +137,101 @@ impl FileTree { *parent_size += node_size; } - println!("{dirsize_map:?}"); - Ok(Self::new(root_node_id, arena)) } + /// Like [`FileTree::init`] but leverages parallelism for disk-reads and [`File`] initialization. + pub fn init_parallel(ctx: &Context) -> Result { + let mut arena = Arena::new(); + let mut branches = HashMap::>::default(); + + parallel::run(ctx, |file| { + let node_id = arena.new_node(file); + let file = arena[node_id].get(); + let file_path = file.path(); + + if let Some(parent) = file_path.parent() { + if let Some(nodes) = branches.get_mut(parent) { + nodes.push(node_id); + } else { + branches.insert(parent.to_path_buf(), vec![node_id]); + } + } else { + let presumable_system_root = fs::canonicalize(file_path) + .into_report(ErrorCategory::Internal) + .context("Failed to canonicalize presumable root directory")?; + + branches.insert(presumable_system_root, vec![]); + } + Ok(()) + })?; + + let root_path = ctx.dir_canonical()?; + + let root_id = root_path + .parent() + .and_then(|p| branches.get(p)) + .and_then(|b| (b.len() == 1).then(|| b[0])) + .ok_or(TreeError::RootDirMissing) + .into_report(ErrorCategory::Internal)?; + + let mut dfs_queue = vec![root_id]; + let mut inode_set = HashSet::default(); + + 'outer: while let Some(node_id) = dfs_queue.last() { + let current_id = *node_id; + + let current_node_path = arena[current_id].get().path(); + + let Some(children) = branches.get_mut(current_node_path) else { + dfs_queue.pop(); + continue; + }; + + while let Some(child_node_id) = children.pop() { + current_id.append(child_node_id, &mut arena); + + let (child_size, child_is_dir, child_inode) = { + let child_node = arena[child_node_id].get(); + let is_dir = child_node.file_type().is_some_and(|f| f.is_dir()); + let size = child_node.size().value(); + let inode = match child_node.inode() { + Ok(value) => value, + Err(err) => { + log::warn!( + "Failed to query inode of {} which may affect disk usage report: {}", + child_node.path().display(), + err + ); + continue; + } + }; + (size, is_dir, inode) + }; + + if child_is_dir { + dfs_queue.push(child_node_id); + continue 'outer; + } + + if inode_set.insert(child_inode) { + *arena[current_id].get_mut().size_mut() += child_size; + } + } + + dfs_queue.pop(); + + if let Some(parent_id) = current_id.ancestors(&arena).skip(1).nth(0) { + let current_dir_size = { + arena[current_id].get().size().value() + }; + *arena[parent_id].get_mut().size_mut() += current_dir_size; + } + } + + Ok(Self { root_id, arena }) + } + pub fn traverse(&self) -> Traverse<'_, File> { self.root_id.traverse(&self.arena) } @@ -122,25 +244,3 @@ impl Deref for FileTree { &self.arena } } - -/// Initializes a single-threaded [`Walk`] instance from [`Context`]. -impl TryFrom<&Context> for Walk { - type Error = Error; - - fn try_from(ctx: &Context) -> Result { - let path = match ctx.dir() { - Some(d) => d.to_path_buf(), - None => Context::get_current_dir()?, - }; - - let walker = WalkBuilder::new(path) - .follow_links(ctx.follow) - .git_ignore(!ctx.no_ignore) - .git_global(!ctx.no_ignore) - .hidden(!ctx.hidden) - .same_file_system(ctx.same_fs) - .build(); - - Ok(walker) - } -} diff --git a/src/tree/parallel.rs b/src/tree/parallel.rs new file mode 100644 index 0000000..3a611f9 --- /dev/null +++ b/src/tree/parallel.rs @@ -0,0 +1,104 @@ +use crate::{error::prelude::*, file::File, user::Context}; +use ignore::{DirEntry, ParallelVisitor, ParallelVisitorBuilder, WalkParallel, WalkState}; +use std::{ + ops::Deref, + result::Result as StdResult, + sync::mpsc::{self, Sender}, + thread, +}; + +pub fn run(ctx: &Context, mut op: F) -> Result<()> +where + F: FnMut(File) -> Result<()> + Send, +{ + let parallel_walker = WalkParallel::try_from(ctx)?; + + let (tx, rx) = mpsc::channel::(); + let mut builder = VisitorBuilder::new(tx.clone(), ctx); + + thread::scope(move |scope| { + let handle = scope.spawn(move || { + loop { + match rx.recv().into_report(ErrorCategory::Internal) { + Ok(TraversalState::Ongoing(file)) => op(file)?, + Ok(TraversalState::Error(e)) => log::warn!("{e}"), + Ok(TraversalState::Done) => break, + Err(e) => return Err(e), + } + } + Ok(()) + }); + + parallel_walker.visit(&mut builder); + let _ = tx.send(TraversalState::Done); + + handle.join().unwrap() + })?; + + Ok(()) +} + +pub struct Visitor<'a> { + tx: Sender, + ctx: &'a Context, +} + +pub struct VisitorBuilder<'a> { + tx: Sender, + ctx: &'a Context, +} + +pub enum TraversalState { + Error(Error), + Ongoing(File), + Done, +} + +impl<'a> VisitorBuilder<'a> { + pub fn new(tx: Sender, ctx: &'a Context) -> Self { + Self { tx, ctx } + } +} + +impl<'a> Visitor<'a> { + pub fn new(tx: Sender, ctx: &'a Context) -> Self { + Self { tx, ctx } + } +} + +impl ParallelVisitor for Visitor<'_> { + fn visit(&mut self, entry: StdResult) -> WalkState { + let entry = match entry.into_report(ErrorCategory::Warning) { + Ok(entry) => entry, + Err(e) => { + let _ = self.send(TraversalState::Error(e)); + return WalkState::Continue; + }, + }; + + match File::init(entry, self.ctx).into_report(ErrorCategory::Warning) { + Ok(file) => { + let _ = self.send(TraversalState::Ongoing(file)); + }, + Err(e) => { + let _ = self.send(TraversalState::Error(e)); + }, + } + + WalkState::Continue + } +} + +impl<'a> ParallelVisitorBuilder<'a> for VisitorBuilder<'a> { + fn build(&mut self) -> Box { + Box::new(Visitor::new(Sender::clone(&self.tx), self.ctx)) + } +} + +impl Deref for Visitor<'_> { + type Target = Sender; + + fn deref(&self) -> &Self::Target { + &self.tx + } +} diff --git a/src/tree/walker.rs b/src/tree/walker.rs new file mode 100644 index 0000000..f6e4a6d --- /dev/null +++ b/src/tree/walker.rs @@ -0,0 +1,39 @@ +use crate::{error::prelude::*, user::Context}; +use ignore::{Walk, WalkBuilder, WalkParallel}; +use std::convert::TryFrom; + +/// Initializes a [`Walk`] instance from [`Context`] for single-threaded disk-reads as well as +/// [`FileTree`] processing. +impl TryFrom<&Context> for Walk { + type Error = Error; + + fn try_from(ctx: &Context) -> Result { + init_builder(ctx).map(|builder| builder.build()) + } +} + +/// Initializes a [`WalkParallel`] instance from [`Context`] for multi-threaded disk-reads as well +/// as [`FileTree`] processing. +impl TryFrom<&Context> for WalkParallel { + type Error = Error; + + fn try_from(ctx: &Context) -> Result { + init_builder(ctx).map(|builder| builder.build_parallel()) + } +} + +/// Initializes a [`WalkBuilder`] from [`Context`]. +fn init_builder(ctx: &Context) -> Result { + let path = ctx.dir_canonical()?; + let mut builder = WalkBuilder::new(path); + + builder + .follow_links(ctx.follow) + .git_ignore(!ctx.no_ignore) + .git_global(!ctx.no_ignore) + .threads(ctx.threads) + .hidden(!ctx.hidden) + .same_file_system(ctx.same_fs); + + Ok(builder) +} diff --git a/src/user/mod.rs b/src/user/mod.rs index 7117eb8..ade0b93 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -1,6 +1,6 @@ use crate::error::prelude::*; use clap::Parser; -use std::{env, path::PathBuf}; +use std::{env, fs, path::PathBuf}; /// Enum definitions for enumerated command-line arguments. pub mod enums; @@ -71,9 +71,16 @@ impl Context { self.dir.as_ref() } + pub fn dir_canonical(&self) -> Result { + match self.dir() { + Some(root) => fs::canonicalize(root).into_report(ErrorCategory::Internal), + None => Self::get_current_dir(), + } + } + pub fn get_current_dir() -> Result { env::current_dir() - .and_then(std::fs::canonicalize) + .and_then(fs::canonicalize) .into_report(ErrorCategory::System) .context("Failed to access current working directory") .set_help("Ensure current directory exists and sufficient permissions are granted")