Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split S3 files into smaller files to send large union file #77

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions protocol-rpc/src/rpc/private-id-multi-key/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.long("run_id")
.default_value("")
.help("A run_id used to identify all the logs in a PL/PA run."),
Arg::with_name("s3api_max_rows")
.long("s3api_max_rows")
.takes_value(true)
.default_value("5000000")
.help("Number of rows per each output S3 file to split."),
])
.groups(&[
ArgGroup::with_name("tls")
Expand All @@ -114,6 +119,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let global_timer = timer::Timer::new_silent("global");
let input_path_str = matches.value_of("input").unwrap_or("input.csv");
let mut input_path = input_path_str.to_string();
let s3api_max_rows_str = matches.value_of("s3api_max_rows").unwrap_or("5000000");
let s3_api_max_rows: usize = s3api_max_rows_str.to_string().parse().unwrap();
if let Ok(s3_path) = S3Path::from_str(input_path_str) {
info!(
"Reading {} from S3 and copying to local path",
Expand Down Expand Up @@ -358,27 +365,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let s3_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = s3_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
let num_split = ((partner_protocol.get_id_map_size() as f32)
/ (s3_api_max_rows as f32))
.ceil() as usize;
partner_protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), Some(num_split))
.expect("Failed to save id map to tempfile");
output_path_s3
.copy_from_local(&path)
.await
.expect("Failed to write to S3");
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
output_path_s3
.copy_from_local(&chunk_path)
.await
.expect("Failed to write to S3");
}
} else if let Ok(output_path_gcp) = GCSPath::from_str(p) {
let gcs_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = gcs_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
partner_protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), None)
.expect("Failed to save id map to tempfile");
output_path_gcp
.copy_from_local(&path)
.await
.expect("Failed to write to GCS");
} else {
let num_split = ((partner_protocol.get_id_map_size() as f32)
/ (s3_api_max_rows as f32))
.ceil() as usize;
partner_protocol
.save_id_map(&String::from(p))
.save_id_map(&String::from(p), Some(num_split))
.expect("Failed to save id map to output file");
}
}
Expand Down
26 changes: 19 additions & 7 deletions protocol-rpc/src/rpc/private-id-multi-key/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct PrivateIdMultiKeyService {
input_with_headers: bool,
metrics_path: Option<String>,
metrics_obj: metrics::Metrics,
s3_api_max_rows: usize,
pub killswitch: Arc<AtomicBool>,
}

Expand All @@ -52,6 +53,7 @@ impl PrivateIdMultiKeyService {
output_path: Option<&str>,
input_with_headers: bool,
metrics_path: Option<String>,
s3_api_max_rows: usize,
) -> PrivateIdMultiKeyService {
PrivateIdMultiKeyService {
protocol: CompanyPrivateIdMultiKey::new(),
Expand All @@ -60,6 +62,7 @@ impl PrivateIdMultiKeyService {
input_with_headers,
metrics_path,
metrics_obj: metrics::Metrics::new("private-id-multi-key".to_string()),
s3_api_max_rows,
killswitch: Arc::new(AtomicBool::new(false)),
}
}
Expand Down Expand Up @@ -298,26 +301,35 @@ impl PrivateIdMultiKey for PrivateIdMultiKeyService {
let s3_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = s3_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
let num_split = ((self.protocol.get_id_map_size() as f32)
/ (self.s3_api_max_rows as f32))
.ceil() as usize;
self.protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), Some(num_split))
.expect("Failed to save id map to tempfile");
output_path_s3
.copy_from_local(&path)
.await
.expect("Failed to write to S3");
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
output_path_s3
.copy_from_local(&chunk_path)
.await
.expect("Failed to write to S3");
}
} else if let Ok(output_path_gcp) = GCSPath::from_str(p) {
let gcs_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = gcs_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
self.protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), None)
.expect("Failed to save id map to tempfile");
output_path_gcp
.copy_from_local(&path)
.await
.expect("Failed to write to GCS");
} else {
self.protocol.save_id_map(p).unwrap();
let num_split = ((self.protocol.get_id_map_size() as f32)
/ (self.s3_api_max_rows as f32))
.ceil() as usize;
self.protocol.save_id_map(p, Some(num_split)).unwrap();
}
}
None => self.protocol.print_id_map(),
Expand Down
8 changes: 8 additions & 0 deletions protocol-rpc/src/rpc/private-id-multi-key/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.long("run_id")
.default_value("")
.help("A run_id used to identify all the logs in a PL/PA run."),
Arg::with_name("s3api_max_rows")
.long("s3api_max_rows")
.takes_value(true)
.default_value("5000000")
.help("Number of rows per each output S3 file to split."),
])
.groups(&[
ArgGroup::with_name("tls")
Expand Down Expand Up @@ -129,6 +134,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_with_headers = matches.is_present("input-with-headers");
let output_path = matches.value_of("output");
let metric_path = matches.value_of("metric-path");
let s3api_max_rows_str = matches.value_of("s3api_max_rows").unwrap_or("5000000");
let s3_api_max_rows: usize = s3api_max_rows_str.to_string().parse().unwrap();

let no_tls = matches.is_present("no-tls");
let host = matches.value_of("host");
Expand Down Expand Up @@ -167,6 +174,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
output_path,
input_with_headers,
metrics_output_path,
s3_api_max_rows,
);

let ks = service.killswitch.clone();
Expand Down
14 changes: 10 additions & 4 deletions protocol/src/private_id_multi_key/company.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use common::files;
use common::permutations::gen_permute_pattern;
use common::permutations::permute;
use common::permutations::undo_permute;
Expand Down Expand Up @@ -481,23 +480,30 @@ impl CompanyPrivateIdMultiKeyProtocol for CompanyPrivateIdMultiKey {
fn print_id_map(&self) {
match (self.plaintext.clone().read(), self.id_map.clone().read()) {
(Ok(data), Ok(id_map)) => {
writer_helper(&data, &id_map, None);
writer_helper(&data, &id_map, None, None);
}
_ => panic!("Cannot print id_map"),
}
}

fn save_id_map(&self, path: &str) -> Result<(), ProtocolError> {
fn save_id_map(&self, path: &str, num_split: Option<usize>) -> Result<(), ProtocolError> {
match (self.plaintext.clone().read(), self.id_map.clone().read()) {
(Ok(data), Ok(id_map)) => {
writer_helper(&data, &id_map, Some(path.to_string()));
writer_helper(&data, &id_map, Some(path.to_string()), num_split);
Ok(())
}
_ => Err(ProtocolError::ErrorIO(
"Unable to write partner view to file".to_string(),
)),
}
}

fn get_id_map_size(&self) -> usize {
match self.id_map.clone().read() {
Ok(id_map) => id_map.len(),
_ => panic!("Cannot get id_map size"),
}
}
}

#[cfg(test)]
Expand Down
59 changes: 39 additions & 20 deletions protocol/src/private_id_multi_key/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,53 @@ fn load_data(plaintext: Arc<RwLock<Vec<Vec<String>>>>, path: &str, input_with_he
t.qps("text read", text_len);
}

fn writer_helper(data: &[Vec<String>], id_map: &[(String, usize, bool)], path: Option<String>) {
let mut device = match path {
Some(path) => {
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(path)
.unwrap();
Some(wr)
}
None => None,
};
fn writer_helper(
data: &[Vec<String>],
id_map: &[(String, usize, bool)],
path: Option<String>,
num_split: Option<usize>,
) {
let mut device_list = Vec::new();
let mut chunk_size = id_map.len();
match path {
Some(path) => match num_split {
Some(num_split) => {
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(chunk_path)
.unwrap();
device_list.push(wr);
chunk_size = ((id_map.len() as f32) / (num_split as f32)).ceil() as usize;
}
}
None => {
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(path)
.unwrap();
device_list.push(wr);
}
},
None => (),
}

for (key, idx, flag) in id_map.iter() {
for (pos, (key, idx, flag)) in id_map.iter().enumerate() {
let mut v = vec![(*key).clone()];

match flag {
true => v.extend(data[*idx].clone()),
false => v.push("NA".to_string()),
}

match device {
Some(ref mut wr) => {
wr.write_record(v.as_slice()).unwrap();
}
None => {
println!("{}", v.join(","));
}
if device_list.is_empty() {
println!("{}", v.join(","));
} else {
let device = &mut device_list[pos / chunk_size];
device.write_record(v.as_slice()).unwrap();
}
}
}
Expand Down
Loading