diff --git a/.gitignore b/.gitignore index 9a5685a..82bb985 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ # Settings for debug -settings-example.toml +settings.toml vat_abcd_crawler.log # Created by https://www.gitignore.io/api/rust,clion+all diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..13a152d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,32 @@ +language: rust + +rust: + - stable + +services: + - postgresql + +addons: + postgresql: "11.2" + +cache: cargo + +before_install: + - sudo apt-get update + - sudo apt-get --yes remove postgresql\* + - sudo apt-get install -y postgresql-11 postgresql-client-11 + - sudo cp /etc/postgresql/{9.6,11}/main/pg_hba.conf + - sudo service postgresql restart 11 + +before_script: + - psql -c 'create database travis_ci_test;' -U postgres + - touch settings.toml + - echo '[database]' >> settings.toml + - echo 'database = "travis_ci_test"' >> settings.toml + - echo 'tls = false' >> settings.toml + - echo 'user = "postgres"' >> settings.toml + - echo 'password = ""' >> settings.toml + +script: + - cargo build --verbose --all + - cargo test --verbose --all diff --git a/Cargo.toml b/Cargo.toml index 2035e76..501719b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ config = { version = "0.9", features = ["toml"] } failure = "0.1" failure_derive = "0.1" log = "0.4" -postgres = { version = "0.15", features = ['with-openssl'] } +postgres = "0.15" +postgres-openssl = "0.1" quick-xml = "0.13" reqwest = "0.9" serde = { version = "1.0", features = ["derive"] } @@ -20,3 +21,6 @@ sha1 = "0.6" simplelog = "0.5" tempfile = "3.0" zip = "0.5" + +[dev-dependencies] +mockito = "0.17.1" diff --git a/README.md b/README.md index c985926..1bb7c8b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![Build Status](https://travis-ci.org/gfbio/vat-abcd-crawler.svg?branch=master)](https://travis-ci.org/gfbio/vat-abcd-crawler) + # VAT ABCD Crawler This repository contains the ABCD crawler for the VAT system. diff --git a/settings.toml b/settings-default.toml similarity index 57% rename from settings.toml rename to settings-default.toml index 77e0223..b135c6a 100644 --- a/settings.toml +++ b/settings-default.toml @@ -8,11 +8,15 @@ dataset_limit = 3 [abcd] fields_file = "abcd-fields.json" +landing_page_field = "/DataSets/DataSet/Metadata/Description/Representation/URI" +storage_dir = "raw_data" -[bms] -monitor_url = "http://bms.gfbio.org/services/xml-archives/?provider=&dsa=" -provider_url = "https://bms.gfbio.org/services/providers/?provider=&name=" -landing_page_url = "http://bms.gfbio.org/services/landingpages/?output=json" +[pangaea] +search_url = "https://ws.pangaea.de/es/dataportal-gfbio/pansimple/_search" +scroll_url = "https://ws.pangaea.de/es/_search/scroll" + +[terminology_service] +landingpage_url = "https://terminologies.gfbio.org/tools/landingpages/landingpage.php" [database] host = "localhost" @@ -22,14 +26,15 @@ database = "" user = "" password = "" schema = "" -dataset_table = "" -temp_dataset_table = "" +dataset_table = "abcd_datasets" +temp_dataset_table = "abcd_datasets_temp" +surrogate_key_column = "surrogate_key" dataset_id_column = "dataset_id" dataset_path_column = "dataset_path" dataset_landing_page_column = "dataset_landing_page" dataset_provider_column = "dataset_provider" -unit_table = "" -temp_unit_table = "" +unit_table = "abcd_units" +temp_unit_table = "abcd_units_temp" listing_view = "dataset_listing" unit_indexed_columns = [ "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal", diff --git a/src/abcd/abcd_fields.rs b/src/abcd/abcd_fields.rs new file mode 100644 index 0000000..68f0fce --- /dev/null +++ b/src/abcd/abcd_fields.rs @@ -0,0 +1,142 @@ +use std::collections::hash_map::Values; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; +use std::path::Path; + +use failure::Error; +use serde::{Deserialize, Serialize}; + +/// This struct reflect a field within the ABCD fields specification file. +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AbcdField { + pub name: String, + pub numeric: bool, + pub vat_mandatory: bool, + pub gfbio_mandatory: bool, + pub global_field: bool, + pub unit: String, +} + +type BinaryString = Vec; + +#[derive(Debug)] +pub struct AbcdFields { + fields: HashMap, +} + +impl AbcdFields { + pub fn from_path(path: &Path) -> Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + + Ok(Self { + fields: Self::fields_to_map(serde_json::from_reader(reader)?), + }) + } + + /// This function creates a map from binary field name to `AbcdField` from a list of `AbcdField`s. + fn fields_to_map(fields: Vec) -> HashMap, AbcdField> { + let mut map = HashMap::with_capacity(fields.len()); + for field in fields { + map.insert(field.name.as_bytes().into(), field); + } + map + } + + pub fn value_of(&self, field: &[u8]) -> Option<&AbcdField> { + self.fields.get(field) + } + + pub fn len(&self) -> usize { + self.fields.len() + } +} + +impl<'a> IntoIterator for &'a AbcdFields { + type Item = &'a AbcdField; + type IntoIter = Values<'a, BinaryString, AbcdField>; + + fn into_iter(self) -> Self::IntoIter { + self.fields.values() + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempPath; + + use crate::test_utils; + + use super::*; + + #[test] + fn simple_file() { + let path = create_test_file_path(); + + let abcd_fields = AbcdFields::from_path(&path).expect("Unable to deserialize input."); + + assert_eq!(abcd_fields.len(), 2); + + let field1 = abcd_fields + .value_of(&b"/DataSets/DataSet/DatasetGUID".to_vec()) + .expect("Field not found"); + assert_eq!(field1.name, "/DataSets/DataSet/DatasetGUID"); + assert_eq!(field1.numeric, false); + assert_eq!(field1.vat_mandatory, false); + assert_eq!(field1.gfbio_mandatory, false); + assert_eq!(field1.global_field, true); + assert!(field1.unit.is_empty()); + + let field2 = abcd_fields + .value_of(&b"/DataSets/DataSet/Units/Unit/SourceInstitutionID".to_vec()) + .expect("Field not found"); + assert_eq!( + field2.name, + "/DataSets/DataSet/Units/Unit/SourceInstitutionID" + ); + assert_eq!(field2.numeric, false); + assert_eq!(field2.vat_mandatory, true); + assert_eq!(field2.gfbio_mandatory, true); + assert_eq!(field2.global_field, false); + assert_eq!(field2.unit, "TEST"); + } + + #[test] + fn iterate_values() { + let path = create_test_file_path(); + + let abcd_fields = AbcdFields::from_path(&path).expect("Unable to deserialize input."); + + let mut number_of_fields = 0; + for _field in &abcd_fields { + number_of_fields += 1; + } + + assert_eq!(number_of_fields, 2); + } + + fn create_test_file_path() -> TempPath { + test_utils::create_temp_file( + r#"[ + { + "name": "/DataSets/DataSet/DatasetGUID", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": false, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/SourceInstitutionID", + "numeric": false, + "vatMandatory": true, + "gfbioMandatory": true, + "globalField": false, + "unit": "TEST" + } + ]"#, + ) + } +} diff --git a/src/abcd/abcd_parser.rs b/src/abcd/abcd_parser.rs new file mode 100644 index 0000000..e618dcd --- /dev/null +++ b/src/abcd/abcd_parser.rs @@ -0,0 +1,414 @@ +use std::collections::HashMap; + +use failure::Error; +use failure::Fail; +use quick_xml::events::Event; +use quick_xml::Reader; + +use crate::abcd::{AbcdFields, AbcdVersion}; +use crate::settings::AbcdSettings; +use crate::vat_type::VatType; + +pub type ValueMap = HashMap; + +/// This parser processes ABCD XML files. +#[derive(Debug)] +pub struct AbcdParser<'a> { + abcd_fields: &'a AbcdFields, + abcd_settings: &'a AbcdSettings, + abcd_version: AbcdVersion, + xml_tag_path: Vec, + xml_buffer: Vec, + values: ValueMap, +} + +impl<'a> AbcdParser<'a> { + /// Create a new `AbcdParser`. + pub fn new(abcd_settings: &'a AbcdSettings, abcd_fields: &'a AbcdFields) -> Self { + Self { + abcd_settings, + abcd_fields, + abcd_version: AbcdVersion::Unknown, + xml_tag_path: Vec::new(), + xml_buffer: Vec::new(), + values: ValueMap::new(), + } + } + + /// Parse a binary XML file to `AbcdResult`s. + pub fn parse( + &mut self, + dataset_id: &str, + dataset_path: &str, + landing_page_proposal: &str, + provider_name: &str, + xml_bytes: &[u8], + ) -> Result { + let mut xml_reader = Reader::from_reader(xml_bytes); + xml_reader.trim_text(true); + + let mut dataset_data = None; + let mut units = Vec::new(); + + loop { + match xml_reader.read_event(&mut self.xml_buffer) { + Ok(Event::Start(ref e)) => { + self.xml_tag_path.push(b'/'); + self.xml_tag_path.extend(Self::strip_tag(e.name())); + + // debug!("XML START: {}", String::from_utf8_lossy(&self.xml_tag_path)); + + match self.xml_tag_path.as_slice() { + b"/DataSets" => { + for attribute in e.attributes().filter_map(Result::ok) { + match attribute.value.as_ref() { + b"http://www.tdwg.org/schemas/abcd/2.06" => { + self.abcd_version = AbcdVersion::Version206; + break; + } + b"http://www.tdwg.org/schemas/abcd/2.1" => { + self.abcd_version = AbcdVersion::Version210; + break; + } + _ => {} + } + } + + // dbg!(&abcd_version); + } + b"/DataSets/DataSet/Units" => { + // eprintln!("Dataset Metadata:"); + // dbg!(&numeric_values); + // dbg!(&textual_values); + // dbg!(units); + + dataset_data = Some(self.finish_map()) + } + _ => {} // ignore other start tags + } + } + Ok(Event::End(ref e)) => { + const SEPARATOR_LENGTH: usize = 1; + + let tag: Vec = Self::strip_tag(e.name()).cloned().collect(); + let stripped_name_length = tag.len(); + + self.xml_tag_path.truncate( + self.xml_tag_path.len() - stripped_name_length - SEPARATOR_LENGTH, + ); + + if self.xml_tag_path == b"/DataSets/DataSet/Units" && tag == b"Unit" { + // eprintln!("Unit Data:"); + // dbg!(&numeric_values); + // dbg!(&textual_values); + + units.push(self.finish_map()); + } + } + Ok(Event::Text(ref e)) => { + if let Some(abcd_field) = self.abcd_fields.value_of(&self.xml_tag_path) { + if abcd_field.numeric { + let string = String::from_utf8_lossy(e.escaped()); + if let Ok(number) = string.parse::() { + self.values.insert(abcd_field.name.clone(), number.into()); + } + } else { + self.values.insert( + abcd_field.name.clone(), + String::from_utf8_lossy(e.escaped()).into(), + ); + } + } + } + Ok(Event::Eof) => break, // exits the loop when reaching end of file + Err(e) => panic!( + "Error at position {}: {:?}", + xml_reader.buffer_position(), + e + ), + _ => (), // ignore all other events + } + + self.xml_buffer.clear(); + } + + self.clear(); // clear resources like buffers + + if let Some(dataset_data) = dataset_data { + let landing_page = if let Some(VatType::Textual(value)) = + dataset_data.get(&self.abcd_settings.landing_page_field) + { + value + } else { + landing_page_proposal + }; + + Ok(AbcdResult::new( + dataset_id.into(), + dataset_path.into(), + landing_page.into(), + provider_name.into(), + dataset_data, + units, + )) + } else { + Err(AbcdContainsNoDatasetMetadata {}.into()) + } + } + + /// Clear all buffers. + fn clear(&mut self) { + self.xml_tag_path.clear(); + self.xml_buffer.clear(); + self.values.clear(); + } + + /// Clear value map and return the old values. + fn finish_map(&mut self) -> ValueMap { + let result = self.values.clone(); + self.values.clear(); + result + } + + /// Strip the namespace from a tag. + fn strip_tag(tag: &[u8]) -> impl Iterator { + let has_colon = tag.iter().any(|&b| b == b':'); + tag.iter() + .skip_while(move |&&b| has_colon && b != b':') + .skip(if has_colon { 1 } else { 0 }) // the ':' itself + } +} + +/// This struct reflects the result of a parsed xml file with miscellaneous additional static meta data +pub struct AbcdResult { + pub dataset_id: String, + pub dataset_path: String, + pub landing_page: String, + pub provider_name: String, + pub dataset: ValueMap, + pub units: Vec, +} + +impl AbcdResult { + /// This constructor creates a new `AbcdResult` from dataset and unit data. + pub fn new( + dataset_id: String, + dataset_path: String, + landing_page: String, + provider_name: String, + dataset_data: ValueMap, + units_data: Vec, + ) -> Self { + AbcdResult { + dataset_id, + dataset_path, + landing_page, + provider_name, + dataset: dataset_data, + units: units_data, + } + } +} + +/// This error occurs when a dataset's metadata is missing. +#[derive(Debug, Default, Fail)] +#[fail(display = "ABCD file contains no dataset metadata.")] +struct AbcdContainsNoDatasetMetadata {} + +#[cfg(test)] +mod tests { + use crate::test_utils; + + use super::*; + + const TECHNICAL_CONTACT_NAME: &str = "TECHNICAL CONTACT NAME"; + const DESCRIPTION_TITLE: &str = "DESCRIPTION TITLE"; + const LANDING_PAGE: &str = "http://LANDING-PAGE/"; + const UNIT_ID: &str = "UNIT ID"; + const UNIT_LONGITUDE: f64 = 10.911; + const UNIT_LATITUDE: f64 = 49.911; + const UNIT_SPATIAL_DATUM: &str = "TECHNICAL WGS84 EMAIL"; + + #[test] + fn simple_file() { + let abcd_fields = create_abcd_fields(); + let abcd_settings = AbcdSettings { + fields_file: "".into(), + landing_page_field: "/DataSets/DataSet/Metadata/Description/Representation/URI".into(), + storage_dir: "raw_data".into(), + }; + + let test_file = create_file_as_bytes(); + + let mut parser = AbcdParser::new(&abcd_settings, &abcd_fields); + + let dataset_id = "dataset_id"; + let dataset_path = "dataset_path"; + let landing_page_proposal = "landing_page proposal"; + let provider_name = "provider_id"; + + let result = parser + .parse( + dataset_id, + dataset_path, + landing_page_proposal, + provider_name, + &test_file, + ) + .expect("Unable to parse bytes"); + + assert_eq!(result.dataset_id, dataset_id); + assert_eq!(result.dataset_path, dataset_path); + assert_eq!(result.landing_page, LANDING_PAGE); + assert_eq!(result.provider_name, provider_name); + + assert_eq!( + Some(&VatType::Textual(TECHNICAL_CONTACT_NAME.into())), + result + .dataset + .get("/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name") + ); + assert_eq!( + Some(&VatType::Textual(DESCRIPTION_TITLE.into())), + result + .dataset + .get("/DataSets/DataSet/Metadata/Description/Representation/Title") + ); + + assert_eq!(result.units.len(), 1); + + let unit = result.units.get(0).unwrap(); + + assert_eq!( + Some(&VatType::Textual(UNIT_ID.into())), + unit.get("/DataSets/DataSet/Units/Unit/UnitID") + ); + assert_eq!( + Some(&VatType::Textual(UNIT_SPATIAL_DATUM.into())), + unit.get("/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/SpatialDatum") + ); + + if let (Some(&VatType::Numeric(longitude)), Some(&VatType::Numeric(latitude))) = ( + unit.get("/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal"), + unit.get("/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal") + ) { + assert!(f64::abs(longitude - UNIT_LONGITUDE) < 0.01); + assert!(f64::abs(latitude - UNIT_LATITUDE) < 0.01); + } + } + + fn create_file_as_bytes() -> Vec { + format!( + r#" + + + + + + {TECHNICAL_CONTACT_NAME} + + + + + + {DESCRIPTION_TITLE} + {LANDING_PAGE} + + + + + + {UNIT_ID} + + + + + {UNIT_LONGITUDE} + {UNIT_LATITUDE} + {UNIT_SPATIAL_DATUM} + + + + + + + + + "#, + TECHNICAL_CONTACT_NAME = TECHNICAL_CONTACT_NAME, + DESCRIPTION_TITLE = DESCRIPTION_TITLE, + LANDING_PAGE = LANDING_PAGE, + UNIT_ID = UNIT_ID, + UNIT_LONGITUDE = UNIT_LONGITUDE, + UNIT_LATITUDE = UNIT_LATITUDE, + UNIT_SPATIAL_DATUM = UNIT_SPATIAL_DATUM, + ).into_bytes() + } + + fn create_abcd_fields() -> AbcdFields { + let fields_file = test_utils::create_temp_file( + r#"[ + { + "name": "/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/Title", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/URI", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/UnitID", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal", + "numeric": true, + "vatMandatory": true, + "gfbioMandatory": true, + "globalField": false, + "unit": "°" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal", + "numeric": true, + "vatMandatory": true, + "gfbioMandatory": true, + "globalField": false, + "unit": "°" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/SpatialDatum", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + } + ]"#, + ); + + AbcdFields::from_path(&fields_file).expect("Unable to create ABCD Fields Spec") + } +} diff --git a/src/abcd_version.rs b/src/abcd/abcd_version.rs similarity index 100% rename from src/abcd_version.rs rename to src/abcd/abcd_version.rs diff --git a/src/abcd/archive_reader.rs b/src/abcd/archive_reader.rs new file mode 100644 index 0000000..8317b88 --- /dev/null +++ b/src/abcd/archive_reader.rs @@ -0,0 +1,159 @@ +use failure::Error; +use std::fs::File; +use std::io::BufReader; +use std::io::Read; +use std::path::Path; +use zip::ZipArchive; + +/// This struct provides a reader that processes a stream of XML files in a ZIP archive. +pub struct ArchiveReader { + archive: ZipArchive>, +} + +impl ArchiveReader { + /// Create an `ArchiveReader` from a path to a ZIP archive. + pub fn from_path(path: &Path) -> Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + let archive = ZipArchive::new(reader)?; + + Ok(Self { archive }) + } + + /// Creates an iterator that traverses over all XML files in the ZIP archive. + pub fn bytes_iter(&mut self) -> ArchiveReaderBytesIter { + ArchiveReaderBytesIter { + index: 0, + end: self.archive.len(), + archive: &mut self.archive, + } + } + + /// Output the number of files in the archive. + pub fn len(&self) -> usize { + self.archive.len() + } +} + +/// This iterator traverses over all files (bytes) in the ZIP archive. +pub struct ArchiveReaderBytesIter<'a> { + index: usize, + end: usize, + archive: &'a mut ZipArchive>, +} + +impl<'a> Iterator for ArchiveReaderBytesIter<'a> { + type Item = Result, Error>; + + fn next(&mut self) -> Option { + if self.index < self.end { + let result = read_contents_of_file(self.archive, self.index); + + self.index += 1; + + Some(result) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let lower_bound = self.end - self.index; + let upper_bound = lower_bound; + (lower_bound, Some(upper_bound)) + } +} + +/// Read the `index`th file from a ZIP archive. +fn read_contents_of_file( + archive: &mut ZipArchive>, + index: usize, +) -> Result, Error> { + let mut inner_file = archive.by_index(index)?; + let mut content = Vec::new(); + inner_file.read_to_end(&mut content)?; + Ok(content) +} + +#[cfg(test)] +mod test { + use super::*; + + use std::io::Write; + use tempfile::{NamedTempFile, TempPath}; + + #[test] + fn read_simple_zip_file() { + let path = create_zip_file(&[MockFile { + name: "Test".into(), + content: "Foobar".into(), + }]); + + let mut reader = ArchiveReader::from_path(&path).expect("Cannot read file."); + let mut archive_iter = reader.bytes_iter(); + + let file = archive_iter + .next() + .expect("Missing first file") + .expect("Unable to read first file"); + + assert_eq!(file, b"Foobar"); + + assert!(archive_iter.next().is_none()); + } + + #[test] + fn read_multiple_files_in_zip_file() { + let path = create_zip_file(&[ + MockFile { + name: "Test".into(), + content: "Foo".into(), + }, + MockFile { + name: "Test2".into(), + content: "Bar".into(), + }, + ]); + + let mut reader = ArchiveReader::from_path(&path).expect("Cannot read file."); + let archive_iter = reader.bytes_iter(); + + let mut number_of_files = 0; + let mut contents = Vec::>::new(); + for bytes in archive_iter.map(Result::unwrap) { + number_of_files += 1; + contents.push(bytes); + } + + assert_eq!(number_of_files, 2); + assert_eq!(contents, vec![b"Foo", b"Bar"]); + } + + struct MockFile { + pub name: String, + pub content: String, + } + + fn create_zip_file(files: &[MockFile]) -> TempPath { + let mut file = NamedTempFile::new().expect("Unable to create file to test."); + + { + let mut zip_writer = zip::ZipWriter::new(&mut file); + + let options = zip::write::FileOptions::default() + .compression_method(zip::CompressionMethod::Stored); + for file in files { + zip_writer + .start_file(file.name.as_str(), options) + .expect("Unable to start file in zip archive."); + zip_writer + .write_all(file.content.as_bytes()) + .expect("Unable to write file in zip archive."); + } + + zip_writer.finish().expect("Unable to finish zip archive."); + } + + file.into_temp_path() + } +} diff --git a/src/abcd/mod.rs b/src/abcd/mod.rs new file mode 100644 index 0000000..d097e71 --- /dev/null +++ b/src/abcd/mod.rs @@ -0,0 +1,9 @@ +mod abcd_fields; +mod abcd_parser; +mod abcd_version; +mod archive_reader; + +pub use self::abcd_fields::{AbcdField, AbcdFields}; +pub use self::abcd_parser::{AbcdParser, AbcdResult, ValueMap}; +pub use self::abcd_version::AbcdVersion; +pub use self::archive_reader::ArchiveReader; diff --git a/src/abcd_fields.rs b/src/abcd_fields.rs deleted file mode 100644 index 9c668ad..0000000 --- a/src/abcd_fields.rs +++ /dev/null @@ -1,35 +0,0 @@ -use failure::Error; -use std::path::Path; -use std::fs::File; -use std::io::BufReader; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; - -/// This struct reflect a field within the ABCD fields specification file. -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AbcdField { - pub name: String, - pub numeric: bool, - pub vat_mandatory: bool, - pub gfbio_mandatory: bool, - pub global_field: bool, - pub unit: String, -} - -/// This function loads all `AbcdField`s from a given file path. -/// It returns a map from the binary field name to the `AbcdField`. -pub fn load_abcd_fields(path: &Path) -> Result, AbcdField>, Error> { - let file = File::open(path)?; - let reader = BufReader::new(file); - Ok(fields_to_map(serde_json::from_reader(reader)?)) -} - -/// This function creates a map from binary field name to `AbcdField` from a list of `AbcdField`s. -fn fields_to_map(fields: Vec) -> HashMap, AbcdField> { - let mut map = HashMap::with_capacity(fields.len()); - for field in fields { - map.insert(field.name.as_bytes().into(), field); - } - map -} \ No newline at end of file diff --git a/src/abcd_parser.rs b/src/abcd_parser.rs deleted file mode 100644 index 6e2fe28..0000000 --- a/src/abcd_parser.rs +++ /dev/null @@ -1,192 +0,0 @@ -use crate::abcd_fields::AbcdField; -use crate::abcd_version::AbcdVersion; -use crate::vat_type::VatType; -use failure::Error; -use failure::Fail; -use quick_xml::events::Event; -use quick_xml::Reader; -use std::collections::HashMap; - -pub type ValueMap = HashMap; - -/// This parser processes ABCD XML files. -#[derive(Debug)] -pub struct AbcdParser<'a> { - abcd_fields: &'a HashMap, AbcdField>, - abcd_version: AbcdVersion, - xml_tag_path: Vec, - xml_buffer: Vec, - values: ValueMap, -} - -impl<'a> AbcdParser<'a> { - /// Create a new `AbcdParser`. - pub fn new(abcd_fields: &'a HashMap, AbcdField>) -> Self { - Self { - abcd_fields, - abcd_version: AbcdVersion::Unknown, - xml_tag_path: Vec::new(), - xml_buffer: Vec::new(), - values: ValueMap::new(), - } - } - - /// Parse a binary XML file to `AbcdResult`s. - pub fn parse(&mut self, - dataset_path: &str, - landing_page: &str, - provider_id: &str, - xml_bytes: &[u8]) -> Result { - let mut xml_reader = Reader::from_reader(xml_bytes); - xml_reader.trim_text(true); - - let mut dataset_data = None; - let mut units = Vec::new(); - - loop { - match xml_reader.read_event(&mut self.xml_buffer) { - Ok(Event::Start(ref e)) => { - self.xml_tag_path.push(b'/'); - self.xml_tag_path.extend(Self::strip_tag(e.name())); - -// debug!("XML START: {}", String::from_utf8_lossy(&self.xml_tag_path)); - - match self.xml_tag_path.as_slice() { - b"/DataSets" => { - for attribute in e.attributes().filter_map(Result::ok) { - match attribute.value.as_ref() { - b"http://www.tdwg.org/schemas/abcd/2.06" => { - self.abcd_version = AbcdVersion::Version206; - break; - } - b"http://www.tdwg.org/schemas/abcd/2.1" => { - self.abcd_version = AbcdVersion::Version210; - break; - } - _ => {} - } - } - -// dbg!(&abcd_version); - } - b"/DataSets/DataSet/Units" => { -// eprintln!("Dataset Metadata:"); -// dbg!(&numeric_values); -// dbg!(&textual_values); -// dbg!(units); - - dataset_data = Some(self.finish_map()) - } - _ => {} // ignore other start tags - } - } - Ok(Event::End(ref e)) => { - const SEPARATOR_LENGTH: usize = 1; - - let tag: Vec = Self::strip_tag(e.name()).cloned().collect(); - let stripped_name_length = tag.len(); - - self.xml_tag_path.truncate(self.xml_tag_path.len() - stripped_name_length - SEPARATOR_LENGTH); - - if self.xml_tag_path == b"/DataSets/DataSet/Units" && tag == b"Unit" { -// eprintln!("Unit Data:"); -// dbg!(&numeric_values); -// dbg!(&textual_values); - - units.push(self.finish_map()); - } - } - Ok(Event::Text(ref e)) => { - if let Some(abcd_field) = self.abcd_fields.get(&self.xml_tag_path) { - if abcd_field.numeric { - let string = String::from_utf8_lossy(e.escaped()); - if let Ok(number) = string.parse::() { - self.values.insert( - abcd_field.name.clone(), - number.into(), - ); - } - } else { - self.values.insert( - abcd_field.name.clone(), - String::from_utf8_lossy(e.escaped()).into(), - ); - } - } - } - Ok(Event::Eof) => break, // exits the loop when reaching end of file - Err(e) => panic!("Error at position {}: {:?}", xml_reader.buffer_position(), e), - _ => (), // ignore all other events - } - - self.xml_buffer.clear(); - } - - self.clear(); // clear resources like buffers - - if let Some(dataset_data) = dataset_data { - Ok(AbcdResult::new( - dataset_path.into(), - landing_page.into(), - provider_id.into(), - dataset_data, - units, - )) - } else { - Err(AbcdContainsNoDatasetMetadata {}.into()) - } - } - - /// Clear all buffers. - fn clear(&mut self) { - self.xml_tag_path.clear(); - self.xml_buffer.clear(); - self.values.clear(); - } - - /// Clear value map and return the old values. - fn finish_map(&mut self) -> ValueMap { - let result = self.values.clone(); - self.values.clear(); - result - } - - /// Strip the namespace from a tag. - fn strip_tag(tag: &[u8]) -> impl Iterator { - let has_colon = tag.iter().any(|&b| b == b':'); - tag.iter() - .skip_while(move |&&b| has_colon && b != b':') - .skip(if has_colon { 1 } else { 0 }) // the ':' itself - } -} - -/// This struct reflects the result of a parsed xml file with miscellaneous additional static meta data -pub struct AbcdResult { - pub dataset_path: String, - pub landing_page: String, - pub provider_id: String, - pub dataset: ValueMap, - pub units: Vec, -} - -impl AbcdResult { - /// This constructor creates a new `AbcdResult` from dataset and unit data. - pub fn new(dataset_path: String, - landing_page: String, - provider_id: String, - dataset_data: ValueMap, - units_data: Vec) -> Self { - AbcdResult { - dataset_path, - landing_page, - provider_id, - dataset: dataset_data, - units: units_data, - } - } -} - -/// This error occurs when a dataset's metadata is missing. -#[derive(Debug, Default, Fail)] -#[fail(display = "ABCD file contains no dataset metadata.")] -struct AbcdContainsNoDatasetMetadata {} diff --git a/src/archive_reader.rs b/src/archive_reader.rs deleted file mode 100644 index 4025392..0000000 --- a/src/archive_reader.rs +++ /dev/null @@ -1,73 +0,0 @@ -use failure::Error; -use std::fs::File; -use std::io::BufReader; -use zip::ZipArchive; -use std::io::Read; -use std::path::Path; - -/// This struct provides a reader that processes a stream of XML files in a ZIP archive. -pub struct ArchiveReader { - archive: ZipArchive>, -// archive_name: String, -} - -impl ArchiveReader { - /// Create an `ArchiveReader` from a path to a ZIP archive. - pub fn from_path(path: &Path) -> Result { -// let archive_name = path.display().to_string(); - - let file = File::open(path)?; - let reader = BufReader::new(file); - let archive = ZipArchive::new(reader)?; - - Ok(Self { - archive, - }) - } - - /// Creates an iterator that traverses over all XML files in the ZIP archive. - pub fn bytes_iter(&mut self) -> ArchiveReaderBytesIter { - ArchiveReaderBytesIter { - index: 0, - end: self.archive.len(), - archive: &mut self.archive, - } - } -} - -/// This iterator traverses over all XML files in the ZIP archive. -pub struct ArchiveReaderBytesIter<'a> { - index: usize, - end: usize, - archive: &'a mut ZipArchive>, -} - -impl<'a> Iterator for ArchiveReaderBytesIter<'a> { - type Item = Result, Error>; - - fn next(&mut self) -> Option { - if self.index < self.end { - let result = read_contents_of_file(self.archive, self.index); - - self.index += 1; - - Some(result) - } else { - None - } - } - - fn size_hint(&self) -> (usize, Option) { - let lower_bound = self.end - self.index; - let upper_bound = lower_bound; - (lower_bound, Some(upper_bound)) - } -} - -/// Read the `index`th XML file from a ZIP archive. -fn read_contents_of_file(archive: &mut ZipArchive>, index: usize) -> Result, Error> { - let mut inner_file = archive.by_index(index)?; - let mut content = Vec::new(); - inner_file.read_to_end(&mut content)?; - Ok(content) -} diff --git a/src/bms_datasets.rs b/src/bms_datasets.rs deleted file mode 100644 index b47473c..0000000 --- a/src/bms_datasets.rs +++ /dev/null @@ -1,123 +0,0 @@ -use failure::Error; -use failure::Fail; -use std::path::Path; -use std::fs::File; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; -use std::io::BufWriter; -use crate::bms_providers::BmsProvider; -use crate::settings::Settings; - -/// This struct contains dataset information from the BMS -#[derive(Debug, Deserialize, Serialize)] -pub struct BmsDataset { - pub provider_datacenter: String, - pub provider_url: String, - pub dsa: String, - pub dataset: String, - pub xml_archives: Vec, -} - -/// This struct contains archive download information for a BMS dataset. -#[derive(Debug, Deserialize, Serialize)] -pub struct BmsXmlArchive { - pub id: String, - pub xml_archive: String, - pub latest: bool, -} - -/// This struct reflects the result of a BMS landing page generator request. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct BmsLandingPage { - provider: String, - data_set: String, - data_unit: String, -} - -impl BmsDataset { - /// Retrieve the archive with the latest flag from a BMS archive. - pub fn get_latest_archive(&self) -> Result<&BmsXmlArchive, DatasetContainsNoFileError> { - self.xml_archives.iter() - .find(|archive| archive.latest) // get latest archive version - .ok_or_else(|| DatasetContainsNoFileError::new(&self.dataset)) - } - - /// Call the landing page generator from the BMS and return the resulting url string. - pub fn get_landing_page(&self, settings: &Settings, providers: &BmsProvider) -> Result { - reqwest::Client::new() - .get(&format!( - "{}&provider={}&dsa={}", - &settings.bms.landing_page_url, providers.id, self.dsa - )) - .send()? - .json::() - .map(|bms_landing_page| bms_landing_page.data_set) - .map_err(|e| e.into()) - } -} - -/// This function downloads a list of dataset information from the BMS. -pub fn load_bms_datasets(url: &str) -> Result, Error> { - Ok( - reqwest::Client::new() - .get(url) - .send()? - .json()? - ) -} - -/// This struct combines dataset information and a path to the downloaded archive file. -#[derive(Debug)] -pub struct DownloadedBmsDataset<'d> { - pub dataset: &'d BmsDataset, - pub path: PathBuf, - pub url: String, -} - -impl<'d> DownloadedBmsDataset<'d> { - /// Create a new descriptor for a downloaded BMS dataset. - pub fn new(dataset: &'d BmsDataset, path: PathBuf, url: String) -> Self { - Self { dataset, path, url } - } -} - -/// Download all datasets into a given temporary directory. -/// This function returns an iterator over `DownloadedBmsDataset`. -pub fn download_datasets<'d, 't>(temp_dir: &'t Path, datasets: &'d [BmsDataset]) -> impl Iterator, Error>> + 'd { - let temp_dir = temp_dir.to_path_buf(); - datasets.iter().enumerate().map(move |(i, dataset)| { - let url = dataset.get_latest_archive()?.xml_archive.clone(); - let download_file_path = temp_dir.join(Path::new(&format!("{}.zip", i))); - download_dataset(url, download_file_path, dataset) - }) -} - -/// This error occurs when it is not possible to download a dataset archive. -#[derive(Debug, Fail)] -#[fail(display = "Dataset {} contains no file to download.", dataset)] -pub struct DatasetContainsNoFileError { - dataset: String, -} - -impl DatasetContainsNoFileError { - /// Create a new `DatasetContainsNoFileError` from a dataset name. - pub fn new(dataset: &str) -> Self { - Self { - dataset: dataset.to_string(), - } - } -} - -/// Download a dataset (the latest) into the given file path. -pub fn download_dataset(url: String, download_file_path: PathBuf, dataset: &BmsDataset) -> Result { - let mut response = reqwest::get(&url)?; - - let output = File::create(&download_file_path)?; - - // copy file to temp path - let mut writer = BufWriter::new(&output); - std::io::copy(&mut response, &mut writer)?; - - Ok(DownloadedBmsDataset::new(dataset, download_file_path, url)) -} diff --git a/src/bms_providers.rs b/src/bms_providers.rs deleted file mode 100644 index 6d26669..0000000 --- a/src/bms_providers.rs +++ /dev/null @@ -1,35 +0,0 @@ -use failure::Error; -use std::collections::HashMap; -use serde::Deserialize; - -/// This struct contains all provider information. -/// The identifier is the `url`, strange as it seems. -#[derive(Debug, Deserialize)] -pub struct BmsProvider { - pub id: String, - pub shortname: String, - pub name: String, - pub url: String, - pub biocase_url: String, -} - -/// This function downloads a list of providers from the BMS. -pub fn load_bms_providers(url: &str) -> Result, Error> { - Ok( - reqwest::Client::new() - .get(url) - .send()? - .json()? - ) -} - -/// This function downloads the BMS providers and provides them -/// as a map from `url`to `BmsProvider`. -pub fn load_bms_providers_as_map(url: &str) -> Result, Error> { - let providers = load_bms_providers(url)?; - Ok( - providers.into_iter() - .map(|provider| (provider.url.clone(), provider)) - .collect() - ) -} diff --git a/src/database_sink.rs b/src/database_sink.rs deleted file mode 100644 index e2a46eb..0000000 --- a/src/database_sink.rs +++ /dev/null @@ -1,579 +0,0 @@ -use crate::abcd_fields::AbcdField; -use crate::abcd_parser::AbcdResult; -use crate::abcd_parser::ValueMap; -use crate::settings; -use failure::{Error, Fail}; -use log::debug; -use postgres::params::ConnectParams; -use postgres::params::Host; -use postgres::tls::openssl::OpenSsl; -use postgres::{Connection, TlsMode}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use csv::WriterBuilder; -use postgres::transaction::Transaction; - -const POSTGRES_CSV_CONFIGURATION: &str = "DELIMITER '\t', NULL '', QUOTE '\"', ESCAPE '\"', FORMAT CSV"; - -/// A PostgreSQL database DAO for storing datasets. -pub struct DatabaseSink<'s> { - connection: Connection, - database_settings: &'s settings::Database, - dataset_fields: Vec, - dataset_fields_hash: Vec, - datasets_to_ids: HashMap, - next_dataset_id: u32, - unit_fields: Vec, - unit_fields_hash: Vec, -} - -impl<'s> DatabaseSink<'s> { - /// Create a new PostgreSQL database sink (DAO). - pub fn new(database_settings: &'s settings::Database, - abcd_fields: &HashMap, AbcdField>) -> Result { - // create database connection params from the settings, including optional tls - let negotiator = if database_settings.tls { - Some(OpenSsl::new()?) - } else { - None - }; - let connection_params = ConnectParams::builder() - .user(&database_settings.user, Some(&database_settings.password)) - .port(database_settings.port) - .database(&database_settings.database) - .build(Host::Tcp(database_settings.host.clone())); - - // fill lists of dataset and unit fields and give them a fixed order for the database inserts - let mut dataset_fields = Vec::new(); - let mut dataset_fields_hash = Vec::new(); - let mut unit_fields = Vec::new(); - let mut unit_fields_hash = Vec::new(); - let mut hasher = sha1::Sha1::new(); - for field in abcd_fields.values() { - let hash = { - hasher.reset(); - hasher.update(field.name.as_bytes()); - hasher.digest().to_string() - }; - if field.global_field { - dataset_fields.push(field.name.clone()); - dataset_fields_hash.push(hash); - } else { - unit_fields.push(field.name.clone()); - unit_fields_hash.push(hash); - } - } - - let mut sink = Self { - connection: Connection::connect( - connection_params, - if let Some(negotiator) = &negotiator { - TlsMode::Prefer(negotiator) - } else { - TlsMode::None - }, - )?, - database_settings, - dataset_fields, - dataset_fields_hash, - datasets_to_ids: HashMap::new(), - next_dataset_id: 1, - unit_fields, - unit_fields_hash, - }; - - sink.initialize_temporary_schema(abcd_fields)?; - - Ok(sink) - } - - /// Initialize the temporary database schema. - fn initialize_temporary_schema(&mut self, abcd_fields: &HashMap, AbcdField>) -> Result<(), Error> { - self.drop_temporary_tables()?; - - self.create_temporary_dataset_table(abcd_fields)?; - - self.create_temporary_unit_table(abcd_fields)?; - - self.create_and_fill_temporary_mapping_table()?; - - Ok(()) - } - - /// Create and fill a temporary mapping table from hashes to field names. - fn create_and_fill_temporary_mapping_table(&mut self) -> Result<(), Error> { - // create table - self.connection.execute(&format!( - "create table {schema}.{table}_translation (name text not null, hash text not null);", - schema = self.database_settings.schema, - table = self.database_settings.temp_dataset_table - ), &[])?; - - // fill table - let statement = self.connection.prepare(&format!( - "insert into {schema}.{table}_translation(name, hash) VALUES ($1, $2);", - schema = self.database_settings.schema, - table = self.database_settings.temp_dataset_table - ))?; - for (name, hash) in self.dataset_fields.iter().zip(&self.dataset_fields_hash) { - statement.execute(&[name, hash])?; - } - for (name, hash) in self.unit_fields.iter().zip(&self.unit_fields_hash) { - statement.execute(&[name, hash])?; - } - - Ok(()) - } - - /// Create the temporary unit table - fn create_temporary_unit_table(&mut self, abcd_fields: &HashMap, AbcdField>) -> Result<(), Error> { - let mut fields = vec![ - format!("{} int not null", self.database_settings.dataset_id_column), - ]; - - for (field, hash) in self.unit_fields.iter().zip(&self.unit_fields_hash) { - let abcd_field = abcd_fields.get(field.as_bytes()) - .ok_or_else(|| DatabaseSinkError::InconsistentUnitColumns(field.clone()))?; - - let data_type_string = if abcd_field.numeric { "double precision" } else { "text" }; - - // TODO: enforce/filter not null - // let null_string = if abcd_field.vat_mandatory { "NOT NULL" } else { "" } - let null_string = ""; - - fields.push(format!("\"{}\" {} {}", hash, data_type_string, null_string)); - } - - self.connection.execute(&format!( - "CREATE TABLE {schema}.{table} ( {fields} );", - schema = &self.database_settings.schema, - table = self.database_settings.temp_unit_table, - fields = fields.join(",") - ), &[])?; - - Ok(()) - } - - /// Create the temporary dataset table - fn create_temporary_dataset_table(&mut self, abcd_fields: &HashMap, AbcdField>) -> Result<(), Error> { - let mut fields = vec![ - format!("{} int primary key", self.database_settings.dataset_id_column), // id - format!("{} text not null", self.database_settings.dataset_path_column), // path - format!("{} text not null", self.database_settings.dataset_landing_page_column), // landing page - format!("{} text not null", self.database_settings.dataset_provider_column), // provider name - ]; - - for (field, hash) in self.dataset_fields.iter().zip(&self.dataset_fields_hash) { - let abcd_field = abcd_fields.get(field.as_bytes()) - .ok_or_else(|| DatabaseSinkError::InconsistentDatasetColumns(field.clone()))?; - - let data_type_string = if abcd_field.numeric { "double precision" } else { "text" }; - - // TODO: enforce/filter not null - // let null_string = if abcd_field.vat_mandatory { "NOT NULL" } else { "" } - let null_string = ""; - - fields.push(format!("\"{}\" {} {}", hash, data_type_string, null_string)); - } - - self.connection.execute(&format!( - "CREATE TABLE {schema}.{table} ( {fields} );", - schema = &self.database_settings.schema, - table = self.database_settings.temp_dataset_table, - fields = fields.join(",") - ), &[])?; - - Ok(()) - } - - /// Drop all temporary tables if they exist. - fn drop_temporary_tables(&mut self) -> Result<(), Error> { - for statement in &[ - // unit temp table - format!("DROP TABLE IF EXISTS {schema}.{table};", - schema = &self.database_settings.schema, - table = &self.database_settings.temp_unit_table), - // dataset temp table - format!("DROP TABLE IF EXISTS {schema}.{table};", - schema = &self.database_settings.schema, - table = &self.database_settings.temp_dataset_table), - // translation temp table - format!("DROP TABLE IF EXISTS {schema}.{table}_translation;", - schema = &self.database_settings.schema, - table = &self.database_settings.temp_dataset_table), - ] { - self.connection.execute(statement, &[])?; - } - - Ok(()) - } - - /// Migrate the temporary tables to the persistent tables. - /// Drops the old tables. - pub fn migrate_schema(&mut self) -> Result<(), Error> { - self.create_indexes_and_statistics()?; - - let transaction = self.connection.transaction_with( - postgres::transaction::Config::new() - .isolation_level(postgres::transaction::IsolationLevel::Serializable) - .read_only(false) - )?; - - self.drop_old_tables(&transaction)?; - - self.rename_temporary_tables(&transaction)?; - - self.rename_constraints_and_indexes(&transaction)?; - - self.create_listing_view(&transaction)?; - - transaction.commit()?; - - Ok(()) - } - - /// Drop old persistent tables. - fn drop_old_tables(&self, transaction: &Transaction) -> Result<(), Error> { - for statement in &[ - // listing view - format!("DROP VIEW IF EXISTS {schema}.{view_name};", - schema = self.database_settings.schema, - view_name = self.database_settings.listing_view), - // unit table - format!("DROP TABLE IF EXISTS {schema}.{table};", - schema = self.database_settings.schema, - table = self.database_settings.unit_table), - // dataset table - format!("DROP TABLE IF EXISTS {schema}.{table};", - schema = self.database_settings.schema, - table = self.database_settings.dataset_table), - // translation table - format!("DROP TABLE IF EXISTS {schema}.{table}_translation;", - schema = self.database_settings.schema, - table = self.database_settings.dataset_table), - ] { - transaction.execute(statement, &[])?; - } - - Ok(()) - } - - /// Rename temporary tables to persistent tables. - fn rename_temporary_tables(&self, transaction: &Transaction) -> Result<(), Error> { - for statement in &[ - // unit table - format!("ALTER TABLE {schema}.{temp_table} RENAME TO {table};", - schema = self.database_settings.schema, - temp_table = self.database_settings.temp_unit_table, - table = self.database_settings.unit_table), - // dataset table - format!("ALTER TABLE {schema}.{temp_table} RENAME TO {table};", - schema = self.database_settings.schema, - temp_table = self.database_settings.temp_dataset_table, - table = self.database_settings.dataset_table), - // translation table - format!("ALTER TABLE {schema}.{temp_table}_translation RENAME TO {table}_translation;", - schema = self.database_settings.schema, - temp_table = self.database_settings.temp_dataset_table, - table = self.database_settings.dataset_table), - ] { - transaction.execute(statement, &[])?; - } - - Ok(()) - } - - /// Rename constraints and indexes from temporary to persistent. - fn rename_constraints_and_indexes(&self, transaction: &Transaction) -> Result<(), Error> { - for statement in &[ - // foreign key - format!("ALTER TABLE {schema}.{table} \ - RENAME CONSTRAINT {temp_prefix}_{temp_suffix}_fk TO {prefix}_{suffix}_fk;", - schema = &self.database_settings.schema, - table = &self.database_settings.unit_table, - temp_prefix = &self.database_settings.temp_unit_table, - temp_suffix = &self.database_settings.dataset_id_column, - prefix = &self.database_settings.unit_table, - suffix = &self.database_settings.dataset_id_column), - // index - format!("ALTER INDEX {schema}.{temp_index}_idx RENAME TO {index}_idx;", - schema = &self.database_settings.schema, - temp_index = &self.database_settings.temp_unit_table, - index = &self.database_settings.unit_table), - ] { - transaction.execute(statement, &[])?; - } - - Ok(()) - } - - /// Create foreign key relationships, indexes, clustering and statistics on the temporary tables. - fn create_indexes_and_statistics(&mut self) -> Result<(), Error> { - let foreign_key_statement = format!( - "ALTER TABLE {schema}.{unit_table} \ - ADD CONSTRAINT {unit_table}_{dataset_id}_fk \ - FOREIGN KEY ({dataset_id}) REFERENCES {schema}.{dataset_table}({dataset_id});", - schema = &self.database_settings.schema, - unit_table = &self.database_settings.temp_unit_table, - dataset_id = &self.database_settings.dataset_id_column, - dataset_table = &self.database_settings.temp_dataset_table - ); - debug!("{}", &foreign_key_statement); - self.connection.execute(&foreign_key_statement, &[])?; - let mut hasher = sha1::Sha1::new(); - let indexed_unit_column_names = self.database_settings.unit_indexed_columns.iter() - .map(|field| { - hasher.reset(); - hasher.update(field.as_bytes()); - hasher.digest().to_string() - }) - .collect::>(); - let unit_index_statement = format!( - "CREATE INDEX {unit_table}_idx ON {schema}.{unit_table} \ - USING btree ({dataset_id}, \"{other}\");", - schema = &self.database_settings.schema, - unit_table = &self.database_settings.temp_unit_table, - dataset_id = &self.database_settings.dataset_id_column, - other = indexed_unit_column_names.join("\", \"") - ); - debug!("{}", &unit_index_statement); - self.connection.execute(&unit_index_statement, &[])?; - let cluster_statement = format!( - "CLUSTER {unit_table}_idx ON {schema}.{unit_table};", - schema = &self.database_settings.schema, - unit_table = &self.database_settings.temp_unit_table - ); - debug!("{}", &cluster_statement); - self.connection.execute(&cluster_statement, &[])?; - let datasets_analyze_statement = format!( - "VACUUM ANALYZE {schema}.{dataset_table};", - schema = &self.database_settings.schema, - dataset_table = &self.database_settings.temp_dataset_table - ); - debug!("{}", &datasets_analyze_statement); - self.connection.execute(&datasets_analyze_statement, &[])?; - let units_analyze_statement = format!( - "VACUUM ANALYZE {schema}.{unit_table};", - schema = &self.database_settings.schema, - unit_table = &self.database_settings.temp_unit_table - ); - debug!("{}", &units_analyze_statement); - self.connection.execute(&units_analyze_statement, &[])?; - - Ok(()) - } - - /// Create view that provides a listing view - pub fn create_listing_view(&self, - transaction: &Transaction) -> Result<(), Error> { - // TODO: replace full names with settings call - let mut hasher = sha1::Sha1::new(); - - hasher.update(b"/DataSets/DataSet/Metadata/Description/Representation/Title"); - let dataset_name = hasher.digest().to_string(); - hasher.reset(); - - hasher.update(b"/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal"); - let latitude_column_hash = hasher.digest().to_string(); - hasher.reset(); - - hasher.update(b"/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal"); - let longitude_column_hash = hasher.digest().to_string(); - hasher.reset(); - - let view_statement = format!( - r#" - CREATE VIEW {schema}.{view_name} AS ( - select link, dataset, file, provider, isGeoReferenced as available, isGeoReferenced - from ( - select {dataset_landing_page_column} as link, - "{dataset_name}" as dataset, - {dataset_path_column} as file, - {dataset_provider_column} as provider, - (SELECT EXISTS( - select * from {schema}.{unit_table} - where {dataset_table}.{dataset_id_column} = {unit_table}.{dataset_id_column} - and "{latitude_column_hash}" is not null - and "{longitude_column_hash}" is not null - )) as isGeoReferenced - from {schema}.{dataset_table} - ) sub);"#, - schema = self.database_settings.schema, - view_name = self.database_settings.listing_view, - dataset_name = dataset_name, - dataset_landing_page_column = self.database_settings.dataset_landing_page_column, - dataset_path_column = self.database_settings.dataset_path_column, - dataset_provider_column = self.database_settings.dataset_provider_column, - dataset_table = self.database_settings.dataset_table, - unit_table = self.database_settings.unit_table, - dataset_id_column = self.database_settings.dataset_id_column, - latitude_column_hash = latitude_column_hash, - longitude_column_hash = longitude_column_hash, - ); - - transaction.execute(&view_statement, &[])?; - - Ok(()) - } - - /// Insert a dataset and its units into the temporary tables. - pub fn insert_dataset(&mut self, abcd_data: &AbcdResult) -> Result<(), Error> { - // retrieve the id for the dataset - // if the dataset is not found, it is necessary to create a dataset database entry at first - let dataset_unique_string = self.to_combined_string(&abcd_data.dataset); - let dataset_id = match self.datasets_to_ids.entry(dataset_unique_string) { - Entry::Occupied(e) => *e.get(), - Entry::Vacant(o) => { - // retrieve next dataset id - let id = self.next_dataset_id; - - Self::insert_dataset_metadata( - &self.database_settings, - &self.connection, - self.dataset_fields.as_slice(), - self.dataset_fields_hash.as_slice(), - abcd_data, - id, - )?; - - // store id in map and increase next id variable - o.insert(id); - self.next_dataset_id += 1; - - id - } - }; - - self.insert_units(&abcd_data, dataset_id)?; - - Ok(()) - } - - /// Insert the dataset metadata into the temporary schema - fn insert_dataset_metadata(database_settings: &settings::Database, - connection: &Connection, - dataset_fields: &[String], - dataset_fields_hash: &[String], - abcd_data: &AbcdResult, - id: u32) -> Result<(), Error> { - let mut values = WriterBuilder::new() - .terminator(csv::Terminator::Any(b'\n')) - .delimiter(b'\t') - .quote(b'"') - .escape(b'"') - .has_headers(false) - .from_writer(vec![]); - let mut columns: Vec<&str> = vec![ - database_settings.dataset_id_column.as_ref(), - database_settings.dataset_path_column.as_ref(), - database_settings.dataset_landing_page_column.as_ref(), - database_settings.dataset_provider_column.as_ref(), - ]; - values.write_field(id.to_string())?; - values.write_field(abcd_data.dataset_path.clone())?; - values.write_field(abcd_data.landing_page.clone())?; - values.write_field(abcd_data.provider_id.clone())?; - for (field, hash) in dataset_fields.iter().zip(dataset_fields_hash.iter()) { - columns.push(&hash); - if let Some(value) = abcd_data.dataset.get(field) { - values.write_field(value.to_string())?; - } else { - values.write_field("")?; - } - } - // terminate record - values.write_record(None::<&[u8]>)?; - - let copy_statement = format!( - "COPY {schema}.{table}(\"{columns}\") FROM STDIN WITH ({options})", - schema = database_settings.schema, - table = database_settings.temp_dataset_table, - columns = columns.join("\",\""), - options = POSTGRES_CSV_CONFIGURATION - ); - // dbg!(©_statement); - - let value_string = values.into_inner()?; - // dbg!(String::from_utf8_lossy(value_string.as_slice())); - - let statement = connection.prepare(©_statement)?; - statement.copy_in( - &[], - &mut value_string.as_slice(), - )?; - - Ok(()) - } - - /// Insert the dataset units into the temporary schema - fn insert_units(&mut self, abcd_data: &AbcdResult, dataset_id: u32) -> Result<(), Error> { - let mut columns: Vec = vec![self.database_settings.dataset_id_column.clone()]; - columns.extend_from_slice(self.unit_fields_hash.as_slice()); - - let dataset_id_string = dataset_id.to_string(); - - let mut values = WriterBuilder::new() - .terminator(csv::Terminator::Any(b'\n')) - .delimiter(b'\t') - .quote(b'"') - .escape(b'"') - .has_headers(false) - .from_writer(vec![]); - - // append units one by one to tsv - for unit_data in &abcd_data.units { - values.write_field(&dataset_id_string)?; // put id first - - for field in &self.unit_fields { - if let Some(value) = unit_data.get(field) { - values.write_field(value.to_string())?; - } else { - values.write_field("")?; - } - } - - values.write_record(None::<&[u8]>)?; // terminate record - } - - let copy_statement = format!( - "COPY {schema}.{table}(\"{columns}\") FROM STDIN WITH ({options})", - schema = self.database_settings.schema, - table = self.database_settings.temp_unit_table, - columns = columns.join("\",\""), - options = POSTGRES_CSV_CONFIGURATION - ); - - let statement = self.connection.prepare(©_statement)?; -// dbg!(&value_string); - statement.copy_in( - &[], - &mut values.into_inner()?.as_slice(), - )?; - - Ok(()) - } - - /// Combines all values of the dataset's metadata into a new string. - fn to_combined_string(&self, dataset_data: &ValueMap) -> String { - let mut hash = String::new(); - - for field in &self.dataset_fields { - if let Some(value) = dataset_data.get(field) { - hash.push_str(&value.to_string()); - } - } - - hash - } -} - -/// An error enum for different database sink errors. -#[derive(Debug, Fail)] -pub enum DatabaseSinkError { - /// This error occurs when there is an inconsistency between the ABCD dataset data and the sink's columns. - #[fail(display = "Inconsistent dataset columns: {}", 0)] - InconsistentDatasetColumns(String), - /// This error occurs when there is an inconsistency between the ABCD unit data and the sink's columns. - #[fail(display = "Inconsistent unit columns: {}", 0)] - InconsistentUnitColumns(String), -} diff --git a/src/file_downloader.rs b/src/file_downloader.rs new file mode 100644 index 0000000..2bf5990 --- /dev/null +++ b/src/file_downloader.rs @@ -0,0 +1,56 @@ +use failure::Error; +use std::fs::File; +use std::io::BufWriter; +use std::path::Path; + +pub struct FileDownloader { + url: String, +} + +impl FileDownloader { + pub fn from_url(url: &str) -> Self { + Self { url: url.into() } + } + + pub fn to_path(&self, path: &Path) -> Result<(), Error> { + let mut response = reqwest::get(&self.url)?; + + if !response.status().is_success() { + return Err(failure::err_msg(format!( + "Webserver responded with code: {}", + response.status(), + ))); + } + + let output_file = File::create(&path)?; + + let mut writer = BufWriter::new(&output_file); + std::io::copy(&mut response, &mut writer)?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::test_utils::{create_empty_temp_file, MockWebserver}; + use std::fs; + + #[test] + fn download_file() { + const CONTENT: &str = "foobar"; + + let webserver = MockWebserver::from_text("/", "GET", CONTENT); + let download_file = create_empty_temp_file(); + + FileDownloader::from_url(&webserver.webserver_root_url()) + .to_path(&download_file) + .unwrap(); + + let file_content = fs::read_to_string(download_file).unwrap(); + + assert_eq!(CONTENT, file_content); + } +} diff --git a/src/main.rs b/src/main.rs index edb4e15..db134d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,172 +1,237 @@ -mod abcd_fields; -mod abcd_parser; -mod abcd_version; -mod archive_reader; -mod bms_datasets; -mod bms_providers; -mod database_sink; -mod settings; -mod vat_type; +use std::fs::File; +use std::path::Path; -use clap::{App, Arg, crate_authors, crate_description, crate_version}; -use crate::abcd_fields::load_abcd_fields; -use crate::abcd_parser::AbcdParser; -use crate::archive_reader::ArchiveReader; -use crate::bms_datasets::download_datasets; -use crate::bms_datasets::load_bms_datasets; -use crate::database_sink::DatabaseSink; +use clap::{crate_authors, crate_description, crate_version, App, Arg}; use failure::Error; -use log::{info, trace, warn, error}; -use settings::Settings; +use log::{error, info, trace, warn}; use simplelog::{CombinedLogger, SharedLogger, TermLogger, WriteLogger}; -use std::fs::File; -use std::path::Path; -use crate::bms_providers::load_bms_providers_as_map; -fn main() { - let matches = App::new("VAT ABCD Crawler") - .version(crate_version!()) - .author(crate_authors!()) - .about(crate_description!()) - .arg(Arg::with_name("settings") - .index(1) - .short("s") - .long("settings") - .value_name("SETTINGS") - .help("Specify the settings file") - .required(true) - .takes_value(true)) - .get_matches(); +use settings::Settings; - let settings_path = Path::new( - matches.value_of("settings").expect("There must be a settings path specified.") - ); - let settings = Settings::new(settings_path).expect("Unable to use config file."); +use crate::abcd::{AbcdFields, AbcdParser, ArchiveReader}; +use crate::file_downloader::FileDownloader; +use crate::pangaea::{PangaeaSearchResult, PangaeaSearchResultEntry}; +use crate::settings::TerminologyServiceSettings; +use crate::storage::DatabaseSink; - initialize_logger(Path::new(&settings.general.log_file), &settings).expect("Unable to initialize logger."); +mod abcd; +mod file_downloader; +mod pangaea; +mod settings; +mod storage; +#[cfg(test)] +mod test_utils; +mod vat_type; - let temp_dir = match tempfile::tempdir() { - Ok(dir) => dir, - Err(e) => { - error!("Unable to create temporary directory: {}", e); - return; // stop program - } - }; +fn main() -> Result<(), Error> { + let settings = initialize_settings().expect("Unable to load settings file."); + + initialize_logger(Path::new(&settings.general.log_file), &settings) + .expect("Unable to initialize logger."); - let abcd_fields = match load_abcd_fields(Path::new(&settings.abcd.fields_file)) { - Ok(map) => map, + let abcd_fields = match AbcdFields::from_path(Path::new(&settings.abcd.fields_file)) { + Ok(fields) => fields, Err(e) => { error!("Unable to load ABCD file: {}", e); - return; // stop program + return Err(e); // stop program } }; let mut database_sink = match DatabaseSink::new(&settings.database, &abcd_fields) { Ok(sink) => sink, Err(e) => { - error!("Unable to create database sink: {}", e); - return; // stop program + error!("Unable to create storage sink: {}", e); + return Err(e); // stop program } }; - let bms_providers = match load_bms_providers_as_map(&settings.bms.provider_url) { - Ok(providers) => providers, + let datasets = match PangaeaSearchResult::retrieve_all_entries(&settings.pangaea) { + Ok(search_entries) => search_entries, Err(e) => { - error!("Unable to download providers from BMS: {}", e); - return; // stop program + error!("Unable to download dataset metadata from Pangaea: {}", e); + return Err(e); // stop program } }; - let bms_datasets = match load_bms_datasets(&settings.bms.monitor_url) { - Ok(datasets) => datasets, - Err(e) => { - error!("Unable to download datasets from BMS: {}", e); - return; // stop program - } + if let Err(e) = process_datasets(&settings, &abcd_fields, &mut database_sink, &datasets) { + error!("Error processing datasets: {}", e); }; - let mut abcd_parser = AbcdParser::new(&abcd_fields); + Ok(()) +} - for path_result in download_datasets(temp_dir.path(), &bms_datasets) - .skip(settings.debug.dataset_start.filter(|_| settings.general.debug).unwrap_or(std::usize::MIN)) - .take(settings.debug.dataset_limit.filter(|_| settings.general.debug).unwrap_or(std::usize::MAX)) { - let download = match path_result { - Ok(d) => d, - Err(e) => { - warn!("Unable to download file: {}", e); - continue; - } - }; - trace!("Temp file: {}", download.path.display()); - info!("Processing `{}` @ `{}` ({})", - download.dataset.dataset, - download.dataset.provider_datacenter, - download.dataset.get_latest_archive() - .map(|archive| archive.xml_archive.as_str()) - .unwrap_or_else(|_| "-") +fn process_datasets( + settings: &Settings, + abcd_fields: &AbcdFields, + database_sink: &mut DatabaseSink, + datasets: &[PangaeaSearchResultEntry], +) -> Result<(), Error> { + let temp_dir = tempfile::tempdir()?; + let storage_dir = Path::new(&settings.abcd.storage_dir); + + create_or_check_for_directory(&storage_dir); + + let mut abcd_parser = AbcdParser::new(&settings.abcd, &abcd_fields); + + for dataset in datasets + .iter() + .skip( + settings + .debug + .dataset_start + .filter(|_| settings.general.debug) + .unwrap_or(std::usize::MIN), + ) + .take( + settings + .debug + .dataset_limit + .filter(|_| settings.general.debug) + .unwrap_or(std::usize::MAX), + ) + { + let file_name = dataset + .id() + .chars() + .map(|c| match c { + 'a'..='z' | 'A'..='Z' | '-' => c, + _ => '_', + }) + .collect::(); + let temp_file_path = temp_dir.path().join(&file_name).with_extension("zip"); + let storage_file_path = storage_dir.join(&file_name).with_extension("zip"); + + if let Err(e) = FileDownloader::from_url(dataset.download_url()).to_path(&temp_file_path) { + warn!( + "Unable to download file {url} to {path}: {error}", + url = dataset.download_url(), + path = temp_file_path.display(), + error = e, + ); + + let recovery_file_path = storage_file_path.as_path(); + match std::fs::copy(recovery_file_path, &temp_file_path) { + Ok(_) => info!("Recovered file {file}", file = file_name), + Err(e) => { + warn!( + "Recovery of file {file} failed: {error}", + file = file_name, + error = e, + ); + + continue; // skip processing this dataset + } + }; + } + + trace!("Temp file: {}", temp_file_path.display()); + info!( + "Processing `{}` @ `{}` ({})", + dataset.id(), + dataset.publisher(), + dataset.download_url(), ); - let bms_provider = match bms_providers.get(&download.dataset.provider_url) { - Some(provider) => provider, - None => { - warn!("Unable to retrieve BMS provider from map for {}", download.dataset.provider_url); - continue; - } - }; + let landing_page_url: String = + propose_landing_page(&settings.terminology_service, dataset.download_url()); - let landing_page = match download.dataset.get_landing_page(&settings, &bms_provider) { - Ok(landing_page) => landing_page, + let mut archive_reader = match ArchiveReader::from_path(&temp_file_path) { + Ok(reader) => reader, Err(e) => { - warn!("Unable to generate landing page for {}; {}", download.dataset.dataset, e); + warn!("Unable to read dataset archive: {}", e); continue; } }; - for xml_bytes_result in ArchiveReader::from_path(&download.path).unwrap().bytes_iter() { + let mut all_inserts_successful = true; + + for xml_bytes_result in archive_reader.bytes_iter() { let xml_bytes = match xml_bytes_result { Ok(bytes) => bytes, Err(e) => { warn!("Unable to read file from zip archive: {}", e); + all_inserts_successful = false; continue; } }; -// let mut string = String::from_utf8(xml_bytes).unwrap(); -// string.truncate(200); -// dbg!(string); - - let abcd_data = match abcd_parser.parse(&download.url, - &landing_page, - &bms_provider.name, - &xml_bytes) { + let abcd_data = match abcd_parser.parse( + dataset.id(), + dataset.download_url(), + &landing_page_url, + &dataset.publisher(), + &xml_bytes, + ) { Ok(data) => data, Err(e) => { warn!("Unable to retrieve ABCD data: {}", e); + all_inserts_successful = false; continue; } }; trace!("{:?}", abcd_data.dataset); -// for unit in abcd_data.units { -// trace!("{:?}", unit); -// } match database_sink.insert_dataset(&abcd_data) { Ok(_) => (), - Err(e) => warn!("Unable to insert dataset into database: {}", e), + Err(e) => { + warn!("Unable to insert dataset into storage: {}", e); + all_inserts_successful = false; + } }; } + + if all_inserts_successful && archive_reader.len() > 0 { + if let Err(e) = std::fs::copy(&temp_file_path, storage_file_path) { + warn!("Unable to store ABCD file: {}", e); + } + } } match database_sink.migrate_schema() { Ok(_) => info!("Schema migration complete."), Err(e) => warn!("Unable to migrate schema: {}", e), }; + + Ok(()) +} + +fn create_or_check_for_directory(storage_dir: &&Path) { + if storage_dir.exists() { + assert!( + storage_dir.is_dir(), + "ABCD storage directory path is not a directory", + ); + } else { + std::fs::create_dir(&storage_dir).expect("ABCD storage directory is not creatable"); + } +} + +fn initialize_settings() -> Result { + let matches = App::new("VAT ABCD Crawler") + .version(crate_version!()) + .author(crate_authors!()) + .about(crate_description!()) + .arg( + Arg::with_name("settings") + .index(1) + .short("s") + .long("settings") + .value_name("SETTINGS") + .help("Specify the settings file") + .required(false) + .takes_value(true), + ) + .get_matches(); + + let settings_path = matches.value_of("settings").map(Path::new); + + Ok(Settings::new(settings_path)?) } /// Initialize the logger. fn initialize_logger(file_path: &Path, settings: &Settings) -> Result<(), Error> { - let mut loggers: Vec> = Vec::new(); + let mut loggers: Vec> = Vec::new(); let log_level = if settings.general.debug { simplelog::LevelFilter::Debug @@ -179,12 +244,25 @@ fn initialize_logger(file_path: &Path, settings: &Settings) -> Result<(), Error> } if let Ok(file) = File::create(file_path) { - loggers.push( - WriteLogger::new(log_level, simplelog::Config::default(), file) - ); + loggers.push(WriteLogger::new( + log_level, + simplelog::Config::default(), + file, + )); } CombinedLogger::init(loggers)?; Ok(()) } + +fn propose_landing_page( + terminology_service_settings: &TerminologyServiceSettings, + dataset_url: &str, +) -> String { + format!( + "{base_url}?archive={dataset_url}", + base_url = terminology_service_settings.landingpage_url, + dataset_url = dataset_url, + ) +} diff --git a/src/pangaea/mod.rs b/src/pangaea/mod.rs new file mode 100644 index 0000000..4b2b49e --- /dev/null +++ b/src/pangaea/mod.rs @@ -0,0 +1,3 @@ +mod search_result; + +pub use self::search_result::{PangaeaSearchResult, PangaeaSearchResultEntry}; diff --git a/src/pangaea/search_result.rs b/src/pangaea/search_result.rs new file mode 100644 index 0000000..222e44f --- /dev/null +++ b/src/pangaea/search_result.rs @@ -0,0 +1,346 @@ +use crate::settings::PangaeaSettings; +use failure::Error; +use log::info; +use serde::Deserialize; +use serde_json::json; +use std::collections::HashMap; + +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct PangaeaSearchResult { + #[serde(rename = "_scroll_id")] + scroll_id: String, + hits: PangaeaSearchResultHits, +} + +#[derive(Clone, Debug, Deserialize, PartialEq)] +struct PangaeaSearchResultHits { + total: u64, + hits: Vec, +} + +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct PangaeaSearchResultEntry { + #[serde(rename = "_id")] + id: String, + #[serde(rename = "_source")] + source: PangaeaSearchResultEntrySource, +} + +#[derive(Clone, Debug, Deserialize, PartialEq)] +struct PangaeaSearchResultEntrySource { + citation_publisher: String, + datalink: String, +} + +impl PangaeaSearchResult { + const SCROLL_TIMEOUT: &'static str = "1m"; + + fn from_url(url: &str) -> Result { + let body = json!({ + "query": { + "bool": { + "filter": [ + { + "term": { + "internal-source": "gfbio-abcd-collections" + } + }, + { + "match_phrase": { + "type": "ABCD_Dataset" + } + }, + { + "term": { + "accessRestricted": false + } + } + ] + } + } + }); + + reqwest::Client::new() + .post(&format!( + "{url}?scroll={scroll_timeout}", + url = url, + scroll_timeout = Self::SCROLL_TIMEOUT, + )) + .json(&body) + .send()? + .json::() + .map_err(|e| e.into()) + } + + fn from_scroll_url(url: &str, scroll_id: &str) -> Result { + let mut body = HashMap::new(); + body.insert("scroll", Self::SCROLL_TIMEOUT); + body.insert("scroll_id", scroll_id); + + reqwest::Client::new() + .post(url) + .json(&body) + .send()? + .json::() + .map_err(|e| e.into()) + } + + pub fn retrieve_all_entries( + pangaea_settings: &PangaeaSettings, + ) -> Result, Error> { + let mut entries = Vec::new(); + + let mut result = Self::from_url(&pangaea_settings.search_url)?; + let mut number_of_results = result.hits.hits.len(); + + while number_of_results > 0 { + info!( + "Retrieved {} items from pangaea (continuing - {} total).", + number_of_results, result.hits.total, + ); + entries.append(&mut result.hits.hits); + + result = Self::from_scroll_url(&pangaea_settings.scroll_url, &result.scroll_id)?; + number_of_results = result.hits.hits.len(); + } + + info!("Retrieved {} items from pangaea.", number_of_results); + entries.append(&mut result.hits.hits); + + Ok(entries) + } +} + +impl PangaeaSearchResultEntry { + pub fn id(&self) -> &str { + &self.id + } + + pub fn publisher(&self) -> &str { + &self.source.citation_publisher + } + + pub fn download_url(&self) -> &str { + &self.source.datalink + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::test_utils::MockWebserver; + use serde_json::Value as JsonValue; + + const CITATION_PUBLISHER: &str = "Test Publisher"; + const CITATION_PUBLISHER_2: &str = "Test Publisher 2"; + const DATALINK: &str = "https://foobar.de"; + const DATALINK_2: &str = "https://foobar2.de"; + const RESULT_ID: &str = "test_id"; + const RESULT_ID_2: &str = "test_id_2"; + const SEARCH_RESULT_HITS: u64 = 64; + const SCROLL_ID: &str = "SCROLL_ID_SCROLL_ID"; + const SCROLL_ID_2: &str = "SCROLL_ID_SCROLL_ID_2"; + + const SEARCH_RESULT_ENTRY_SOURCE_JSON: fn() -> JsonValue = || { + json!({ + "citation_publisher": CITATION_PUBLISHER, + "datalink": DATALINK, + }) + }; + const SEARCH_RESULT_ENTRY_SOURCE_JSON_2: fn() -> JsonValue = || { + json!({ + "citation_publisher": CITATION_PUBLISHER_2, + "datalink": DATALINK_2, + }) + }; + const SEARCH_RESULT_ENTRY_JSON: fn() -> JsonValue = || { + json!({ + "_id": RESULT_ID, + "_source": SEARCH_RESULT_ENTRY_SOURCE_JSON(), + }) + }; + const SEARCH_RESULT_ENTRY_JSON_2: fn() -> JsonValue = || { + json!({ + "_id": RESULT_ID_2, + "_source": SEARCH_RESULT_ENTRY_SOURCE_JSON_2(), + }) + }; + const SEARCH_RESULT_HITS_JSON: fn() -> JsonValue = || { + json!({ + "total": SEARCH_RESULT_HITS, + "max_score": 1.0, + "hits": [ + SEARCH_RESULT_ENTRY_JSON(), + SEARCH_RESULT_ENTRY_JSON_2(), + ], + }) + }; + const SEARCH_RESULT_JSON: fn() -> JsonValue = || { + json!({ + "_scroll_id": SCROLL_ID, + "took": 1373, + "hits": SEARCH_RESULT_HITS_JSON(), + }) + }; + + #[test] + fn parse_search_result_entry_source() { + let search_result_entry_source = serde_json::from_str::( + &SEARCH_RESULT_ENTRY_SOURCE_JSON().to_string(), + ) + .unwrap(); + + assert_eq!( + search_result_entry_source, + PangaeaSearchResultEntrySource { + citation_publisher: CITATION_PUBLISHER.into(), + datalink: DATALINK.into(), + } + ) + } + + #[test] + fn parse_search_result_entry() { + let search_result_entry = serde_json::from_str::( + &SEARCH_RESULT_ENTRY_JSON().to_string(), + ) + .unwrap(); + + assert_eq!( + search_result_entry, + PangaeaSearchResultEntry { + id: RESULT_ID.to_string(), + source: PangaeaSearchResultEntrySource { + citation_publisher: CITATION_PUBLISHER.into(), + datalink: DATALINK.into(), + }, + } + ) + } + + #[test] + fn parse_search_result_hits() { + let search_result_hits = + serde_json::from_str::(&SEARCH_RESULT_HITS_JSON().to_string()) + .unwrap(); + + assert_eq!( + search_result_hits, + PangaeaSearchResultHits { + total: SEARCH_RESULT_HITS, + hits: vec![ + PangaeaSearchResultEntry { + id: RESULT_ID.to_string(), + source: PangaeaSearchResultEntrySource { + citation_publisher: CITATION_PUBLISHER.into(), + datalink: DATALINK.into(), + }, + }, + PangaeaSearchResultEntry { + id: RESULT_ID_2.to_string(), + source: PangaeaSearchResultEntrySource { + citation_publisher: CITATION_PUBLISHER_2.into(), + datalink: DATALINK_2.into(), + }, + }, + ], + } + ); + } + + #[test] + fn parse_search_result() { + let search_result = + serde_json::from_str::(&SEARCH_RESULT_JSON().to_string()).unwrap(); + + assert_eq!(search_result.scroll_id, SCROLL_ID); + assert_eq!(search_result.hits.hits.len(), 2); + } + + #[test] + fn parse_webserver_result() { + let webserver = MockWebserver::from_json( + &format!("/?scroll={}", PangaeaSearchResult::SCROLL_TIMEOUT), + "POST", + &SEARCH_RESULT_JSON().to_string(), + ); + + let search_result = PangaeaSearchResult::from_url(&webserver.webserver_root_url()).unwrap(); + + assert_eq!(search_result.scroll_id, SCROLL_ID); + assert_eq!(search_result.hits.hits.len(), 2); + } + + #[test] + fn parse_scroll_result() { + let webserver = MockWebserver::from_json("/", "POST", &SEARCH_RESULT_JSON().to_string()); + + let search_result = + PangaeaSearchResult::from_scroll_url(&webserver.webserver_root_url(), SCROLL_ID) + .unwrap(); + + assert_eq!(search_result.scroll_id, SCROLL_ID); + assert_eq!(search_result.hits.hits.len(), 2); + } + + #[test] + fn collect_multiple_request_data() { + let _m1 = + MockWebserver::from_json("/?scroll=1m", "POST", &SEARCH_RESULT_JSON().to_string()); + let _m2 = MockWebserver::from_json_with_json_condition( + "/scroll", + "POST", + &json!({ + "scroll" : PangaeaSearchResult::SCROLL_TIMEOUT, + "scroll_id" : SCROLL_ID, + }) + .to_string(), + &json!({ + "_scroll_id": SCROLL_ID_2, + "took": 1373, + "hits": { + "total": SEARCH_RESULT_HITS, + "hits": [ // <-- CONTINUE + SEARCH_RESULT_ENTRY_JSON(), + SEARCH_RESULT_ENTRY_JSON_2(), + ], + }, + }) + .to_string(), + ); + let _m3 = MockWebserver::from_json_with_json_condition( + "/scroll", + "POST", + &json!({ + "scroll" : PangaeaSearchResult::SCROLL_TIMEOUT, + "scroll_id" : SCROLL_ID_2, + }) + .to_string(), + &json!({ + "_scroll_id": SCROLL_ID_2, + "took": 1373, + "hits": { + "total": SEARCH_RESULT_HITS, + "hits": [], // <-- NO CONTINUE + }, + }) + .to_string(), + ); + + assert_eq!(_m2.webserver_root_url(), _m3.webserver_root_url()); + + let entries = PangaeaSearchResult::retrieve_all_entries(&PangaeaSettings { + search_url: _m1.webserver_root_url(), + scroll_url: format!("{}/scroll", _m2.webserver_root_url()), + }) + .unwrap(); + + assert_eq!(4, entries.len()); + + let entry = &entries[0]; + assert_eq!(RESULT_ID, entry.id()); + assert_eq!(DATALINK, entry.download_url()); + assert_eq!(CITATION_PUBLISHER, entry.publisher()); + } +} diff --git a/src/settings.rs b/src/settings.rs index b93e459..0012c85 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,29 +1,36 @@ +use std::path::Path; + use config::Config; -use config::File; use config::ConfigError; +use config::File; use serde::Deserialize; -use std::path::Path; #[derive(Debug, Deserialize)] -pub struct General { +pub struct GeneralSettings { pub log_file: String, pub debug: bool, } #[derive(Debug, Deserialize)] -pub struct Abcd { +pub struct AbcdSettings { pub fields_file: String, + pub landing_page_field: String, + pub storage_dir: String, +} + +#[derive(Debug, Deserialize)] +pub struct PangaeaSettings { + pub search_url: String, + pub scroll_url: String, } #[derive(Debug, Deserialize)] -pub struct Bms { - pub monitor_url: String, - pub provider_url: String, - pub landing_page_url: String, +pub struct TerminologyServiceSettings { + pub landingpage_url: String, } #[derive(Debug, Deserialize)] -pub struct Database { +pub struct DatabaseSettings { pub host: String, pub port: u16, pub tls: bool, @@ -34,6 +41,7 @@ pub struct Database { pub dataset_table: String, pub listing_view: String, pub temp_dataset_table: String, + pub surrogate_key_column: String, pub dataset_id_column: String, pub dataset_path_column: String, pub dataset_landing_page_column: String, @@ -44,7 +52,7 @@ pub struct Database { } #[derive(Debug, Deserialize)] -pub struct Debug { +pub struct DebugSettings { pub dataset_start: Option, pub dataset_limit: Option, } @@ -52,18 +60,45 @@ pub struct Debug { /// This struct stores the program settings. #[derive(Debug, Deserialize)] pub struct Settings { - pub abcd: Abcd, - pub bms: Bms, - pub database: Database, - pub debug: Debug, - pub general: General, + pub abcd: AbcdSettings, + pub pangaea: PangaeaSettings, + pub terminology_service: TerminologyServiceSettings, + pub database: DatabaseSettings, + pub debug: DebugSettings, + pub general: GeneralSettings, } impl Settings { - pub fn new(path: &Path) -> Result { + pub fn new(path: Option<&Path>) -> Result { let mut s = Config::new(); - s.merge(File::from(path))?; + s.merge(File::from(Path::new("settings-default.toml")))?; + s.merge(File::from(Path::new("settings.toml")))?; + if let Some(path) = path { + s.merge(File::from(path))?; + } s.try_into() } } + +#[cfg(test)] +mod test { + use crate::test_utils; + + use super::*; + + #[test] + fn load_file() { + let path = test_utils::create_temp_file_with_suffix( + ".toml", + r#" + [general] + debug = true + "#, + ); + + let settings = Settings::new(Some(&path)).expect("Unable to load settings."); + + assert!(settings.general.debug); + } +} diff --git a/src/storage/database_sink.rs b/src/storage/database_sink.rs new file mode 100644 index 0000000..62f6f08 --- /dev/null +++ b/src/storage/database_sink.rs @@ -0,0 +1,1357 @@ +use csv::WriterBuilder; +use failure::{Error, Fail}; +use log::debug; +use postgres::params::ConnectParams; +use postgres::params::Host; +use postgres::transaction::Transaction; +use postgres::{Connection, TlsMode}; +use postgres_openssl::OpenSsl; + +use crate::abcd::{AbcdFields, AbcdResult}; +use crate::settings; +use crate::settings::DatabaseSettings; +use crate::storage::{Field, SurrogateKey, SurrogateKeyType}; + +const POSTGRES_CSV_CONFIGURATION: &str = + "DELIMITER '\t', NULL '', QUOTE '\"', ESCAPE '\"', FORMAT CSV"; + +/// A PostgreSQL storage DAO for storing datasets. +pub struct DatabaseSink<'s> { + connection: Connection, + database_settings: &'s settings::DatabaseSettings, + dataset_fields: Vec, + surrogate_key: SurrogateKey, + unit_fields: Vec, +} + +impl<'s> DatabaseSink<'s> { + /// Create a new PostgreSQL storage sink (DAO). + pub fn new( + database_settings: &'s settings::DatabaseSettings, + abcd_fields: &AbcdFields, + ) -> Result { + let connection = >::create_database_connection(&database_settings)?; + + let (dataset_fields, unit_fields) = + >::create_lists_of_dataset_and_unit_fields(abcd_fields); + + let mut sink = Self { + connection, + database_settings, + dataset_fields, + surrogate_key: Default::default(), + unit_fields, + }; + + sink.initialize_temporary_schema(abcd_fields)?; + + Ok(sink) + } + + fn create_database_connection( + database_settings: &DatabaseSettings, + ) -> Result { + let connection_params = ConnectParams::builder() + .user(&database_settings.user, Some(&database_settings.password)) + .port(database_settings.port) + .database(&database_settings.database) + .build(Host::Tcp(database_settings.host.clone())); + + let negotiator = if database_settings.tls { + Some(OpenSsl::new()?) + } else { + None + }; + let tls_mode = if let Some(ref negotiator) = negotiator { + TlsMode::Prefer(negotiator) + } else { + TlsMode::None + }; + + Ok(Connection::connect(connection_params, tls_mode)?) + } + + fn create_lists_of_dataset_and_unit_fields( + abcd_fields: &AbcdFields, + ) -> (Vec, Vec) { + let mut dataset_fields = Vec::new(); + let mut unit_fields = Vec::new(); + + for field in abcd_fields { + if field.global_field { + dataset_fields.push(field.name.as_str().into()); + } else { + unit_fields.push(field.name.as_str().into()); + } + } + + (dataset_fields, unit_fields) + } + + /// Initialize the temporary storage schema. + fn initialize_temporary_schema(&mut self, abcd_fields: &AbcdFields) -> Result<(), Error> { + self.drop_temporary_tables()?; + + self.create_temporary_dataset_table(abcd_fields)?; + + self.create_temporary_unit_table(abcd_fields)?; + + self.create_and_fill_temporary_mapping_table()?; + + Ok(()) + } + + /// Create and fill a temporary mapping table from hashes to field names. + fn create_and_fill_temporary_mapping_table(&mut self) -> Result<(), Error> { + // create table + self.connection.execute( + &format!( + "create table {schema}.{table}_translation (name text not null, hash text not null);", + schema = self.database_settings.schema, + table = self.database_settings.temp_dataset_table + ), + &[], + )?; + + // fill table + let statement = self.connection.prepare(&format!( + "insert into {schema}.{table}_translation(name, hash) VALUES ($1, $2);", + schema = self.database_settings.schema, + table = self.database_settings.temp_dataset_table + ))?; + for field in self.dataset_fields.iter().chain(&self.unit_fields) { + statement.execute(&[&field.name, &field.hash])?; + } + + Ok(()) + } + + /// Create the temporary unit table + fn create_temporary_unit_table(&mut self, abcd_fields: &AbcdFields) -> Result<(), Error> { + let mut fields = vec![format!( + "{} int not null", + self.database_settings.surrogate_key_column, + )]; + + for field in &self.unit_fields { + let abcd_field = abcd_fields + .value_of(field.name.as_bytes()) + .ok_or_else(|| DatabaseSinkError::InconsistentUnitColumns(field.name.clone()))?; + + let data_type_string = if abcd_field.numeric { + "double precision" + } else { + "text" + }; + + // TODO: enforce/filter not null + // let null_string = if abcd_field.vat_mandatory { "NOT NULL" } else { "" } + let null_string = ""; + + fields.push(format!( + "\"{hash}\" {datatype} {nullable}", + hash = field.hash, + datatype = data_type_string, + nullable = null_string, + )); + } + + self.connection.execute( + &format!( + "CREATE TABLE {schema}.{table} ( {fields} );", + schema = &self.database_settings.schema, + table = self.database_settings.temp_unit_table, + fields = fields.join(",") + ), + &[], + )?; + + Ok(()) + } + + /// Create the temporary dataset table + fn create_temporary_dataset_table(&mut self, abcd_fields: &AbcdFields) -> Result<(), Error> { + let mut fields = vec![ + format!( + "{} int primary key", + self.database_settings.surrogate_key_column, + ), // surrogate key + format!("{} text not null", self.database_settings.dataset_id_column), // id + format!( + "{} text not null", + self.database_settings.dataset_path_column + ), // path + format!( + "{} text not null", + self.database_settings.dataset_landing_page_column + ), // landing page + format!( + "{} text not null", + self.database_settings.dataset_provider_column + ), // provider name + ]; + + for field in &self.dataset_fields { + let abcd_field = abcd_fields + .value_of(field.name.as_bytes()) + .ok_or_else(|| DatabaseSinkError::InconsistentDatasetColumns(field.name.clone()))?; + + let data_type_string = if abcd_field.numeric { + "double precision" + } else { + "text" + }; + + // TODO: enforce/filter not null + // let null_string = if abcd_field.vat_mandatory { "NOT NULL" } else { "" } + let null_string = ""; + + fields.push(format!( + "\"{hash}\" {datatype} {nullable}", + hash = field.hash, + datatype = data_type_string, + nullable = null_string, + )); + } + + self.connection.execute( + &format!( + "CREATE TABLE {schema}.{table} ( {fields} );", + schema = &self.database_settings.schema, + table = self.database_settings.temp_dataset_table, + fields = fields.join(",") + ), + &[], + )?; + + Ok(()) + } + + /// Drop all temporary tables if they exist. + fn drop_temporary_tables(&mut self) -> Result<(), Error> { + for statement in &[ + // unit temp table + format!( + "DROP TABLE IF EXISTS {schema}.{table};", + schema = &self.database_settings.schema, + table = &self.database_settings.temp_unit_table + ), + // dataset temp table + format!( + "DROP TABLE IF EXISTS {schema}.{table};", + schema = &self.database_settings.schema, + table = &self.database_settings.temp_dataset_table + ), + // translation temp table + format!( + "DROP TABLE IF EXISTS {schema}.{table}_translation;", + schema = &self.database_settings.schema, + table = &self.database_settings.temp_dataset_table + ), + ] { + self.connection.execute(statement, &[])?; + } + + Ok(()) + } + + /// Migrate the temporary tables to the persistent tables. + /// Drops the old tables. + pub fn migrate_schema(&mut self) -> Result<(), Error> { + self.create_indexes_and_statistics()?; + + let transaction = self.connection.transaction_with( + postgres::transaction::Config::new() + .isolation_level(postgres::transaction::IsolationLevel::Serializable) + .read_only(false), + )?; + + self.drop_old_tables(&transaction)?; + + self.rename_temporary_tables(&transaction)?; + + self.rename_constraints_and_indexes(&transaction)?; + + self.create_listing_view(&transaction)?; + + transaction.commit()?; + + Ok(()) + } + + /// Drop old persistent tables. + fn drop_old_tables(&self, transaction: &Transaction) -> Result<(), Error> { + for statement in &[ + // listing view + format!( + "DROP VIEW IF EXISTS {schema}.{view_name};", + schema = self.database_settings.schema, + view_name = self.database_settings.listing_view + ), + // unit table + format!( + "DROP TABLE IF EXISTS {schema}.{table};", + schema = self.database_settings.schema, + table = self.database_settings.unit_table + ), + // dataset table + format!( + "DROP TABLE IF EXISTS {schema}.{table};", + schema = self.database_settings.schema, + table = self.database_settings.dataset_table + ), + // translation table + format!( + "DROP TABLE IF EXISTS {schema}.{table}_translation;", + schema = self.database_settings.schema, + table = self.database_settings.dataset_table + ), + ] { + transaction.execute(statement, &[])?; + } + + Ok(()) + } + + /// Rename temporary tables to persistent tables. + fn rename_temporary_tables(&self, transaction: &Transaction) -> Result<(), Error> { + for statement in &[ + // unit table + format!( + "ALTER TABLE {schema}.{temp_table} RENAME TO {table};", + schema = self.database_settings.schema, + temp_table = self.database_settings.temp_unit_table, + table = self.database_settings.unit_table + ), + // dataset table + format!( + "ALTER TABLE {schema}.{temp_table} RENAME TO {table};", + schema = self.database_settings.schema, + temp_table = self.database_settings.temp_dataset_table, + table = self.database_settings.dataset_table + ), + // translation table + format!( + "ALTER TABLE {schema}.{temp_table}_translation RENAME TO {table}_translation;", + schema = self.database_settings.schema, + temp_table = self.database_settings.temp_dataset_table, + table = self.database_settings.dataset_table + ), + ] { + transaction.execute(statement, &[])?; + } + + Ok(()) + } + + /// Rename constraints and indexes from temporary to persistent. + fn rename_constraints_and_indexes(&self, transaction: &Transaction) -> Result<(), Error> { + for statement in &[ + // foreign key + format!( + "ALTER TABLE {schema}.{table} \ + RENAME CONSTRAINT {temp_prefix}_{temp_suffix}_fk TO {prefix}_{suffix}_fk;", + schema = &self.database_settings.schema, + table = &self.database_settings.unit_table, + temp_prefix = &self.database_settings.temp_unit_table, + temp_suffix = &self.database_settings.surrogate_key_column, + prefix = &self.database_settings.unit_table, + suffix = &self.database_settings.surrogate_key_column + ), + // index + format!( + "ALTER INDEX {schema}.{temp_index}_idx RENAME TO {index}_idx;", + schema = &self.database_settings.schema, + temp_index = &self.database_settings.temp_unit_table, + index = &self.database_settings.unit_table + ), + ] { + transaction.execute(statement, &[])?; + } + + Ok(()) + } + + /// Create foreign key relationships, indexes, clustering and statistics on the temporary tables. + fn create_indexes_and_statistics(&mut self) -> Result<(), Error> { + let foreign_key_statement = format!( + "ALTER TABLE {schema}.{unit_table} \ + ADD CONSTRAINT {unit_table}_{dataset_id}_fk \ + FOREIGN KEY ({dataset_id}) REFERENCES {schema}.{dataset_table}({dataset_id});", + schema = &self.database_settings.schema, + unit_table = &self.database_settings.temp_unit_table, + dataset_id = &self.database_settings.surrogate_key_column, + dataset_table = &self.database_settings.temp_dataset_table + ); + debug!("{}", &foreign_key_statement); + self.connection.execute(&foreign_key_statement, &[])?; + let indexed_unit_column_names = self + .database_settings + .unit_indexed_columns + .iter() + .map(Field::from) + .map(|field| field.hash) + .collect::>(); + let unit_index_statement = format!( + "CREATE INDEX {unit_table}_idx ON {schema}.{unit_table} \ + USING btree ({surrogate_key_column} {other_begin}{other}{other_end});", + schema = &self.database_settings.schema, + unit_table = &self.database_settings.temp_unit_table, + surrogate_key_column = &self.database_settings.surrogate_key_column, + other_begin = if indexed_unit_column_names.is_empty() { + "" + } else { + ", \"" + }, + other = indexed_unit_column_names.join("\", \""), + other_end = if indexed_unit_column_names.is_empty() { + "" + } else { + "\"" + }, + ); + debug!("{}", &unit_index_statement); + self.connection.execute(&unit_index_statement, &[])?; + let cluster_statement = format!( + "CLUSTER {unit_table}_idx ON {schema}.{unit_table};", + schema = &self.database_settings.schema, + unit_table = &self.database_settings.temp_unit_table + ); + debug!("{}", &cluster_statement); + self.connection.execute(&cluster_statement, &[])?; + let datasets_analyze_statement = format!( + "VACUUM ANALYZE {schema}.{dataset_table};", + schema = &self.database_settings.schema, + dataset_table = &self.database_settings.temp_dataset_table + ); + debug!("{}", &datasets_analyze_statement); + self.connection.execute(&datasets_analyze_statement, &[])?; + let units_analyze_statement = format!( + "VACUUM ANALYZE {schema}.{unit_table};", + schema = &self.database_settings.schema, + unit_table = &self.database_settings.temp_unit_table + ); + debug!("{}", &units_analyze_statement); + self.connection.execute(&units_analyze_statement, &[])?; + + Ok(()) + } + + /// Create view that provides a listing view + pub fn create_listing_view(&self, transaction: &Transaction) -> Result<(), Error> { + // TODO: replace full names with settings call + + let dataset_title = if let Some(field) = self.dataset_fields.iter().find(|field| { + field.name == "/DataSets/DataSet/Metadata/Description/Representation/Title" + }) { + format!("\"{}\"", field.hash) + } else { + "''".to_string() + }; + + let latitude_column = if let Some(field) = self.unit_fields.iter().find(|field| { + field.name == "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal" + }) { + format!("\"{}\"", field.hash) + } else { + "NULL".to_string() + }; + + let longitude_column = if let Some(field) = self.unit_fields.iter().find(|field| { + field.name == "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal" + }) { + format!("\"{}\"", field.hash) + } else { + "NULL".to_string() + }; + + let view_statement = format!( + r#" + CREATE VIEW {schema}.{view_name} AS ( + select link, dataset, id, provider, isGeoReferenced as available, isGeoReferenced + from ( + select {dataset_landing_page_column} as link, + {dataset_title} as dataset, + {dataset_id_column} as id, + {dataset_provider_column} as provider, + (SELECT EXISTS( + select * from {schema}.{unit_table} + where {dataset_table}.{surrogate_key_column} = {unit_table}.{surrogate_key_column} + and {latitude_column} is not null + and {longitude_column} is not null + )) as isGeoReferenced + from {schema}.{dataset_table} + ) sub);"#, + schema = self.database_settings.schema, + view_name = self.database_settings.listing_view, + dataset_title = dataset_title, + dataset_landing_page_column = self.database_settings.dataset_landing_page_column, + dataset_id_column = self.database_settings.dataset_id_column, + dataset_provider_column = self.database_settings.dataset_provider_column, + dataset_table = self.database_settings.dataset_table, + unit_table = self.database_settings.unit_table, + surrogate_key_column = self.database_settings.surrogate_key_column, + latitude_column = latitude_column, + longitude_column = longitude_column, + ); + + transaction.execute(&view_statement, &[])?; + + Ok(()) + } + + /// Insert a dataset and its units into the temporary tables. + pub fn insert_dataset(&mut self, abcd_data: &AbcdResult) -> Result<(), Error> { + match self.surrogate_key.for_id(&abcd_data.dataset_id) { + SurrogateKeyType::New(surrogate_key) => { + Self::insert_dataset_metadata( + &self.database_settings, + &self.connection, + self.dataset_fields.as_slice(), + abcd_data, + surrogate_key, + )?; + self.insert_units(&abcd_data, surrogate_key)?; + } + SurrogateKeyType::Existing(surrogate_key) => { + self.insert_units(&abcd_data, surrogate_key)?; + } + } + + Ok(()) + } + + /// Insert the dataset metadata into the temporary schema + fn insert_dataset_metadata( + database_settings: &settings::DatabaseSettings, + connection: &Connection, + dataset_fields: &[Field], + abcd_data: &AbcdResult, + id: u32, + ) -> Result<(), Error> { + let mut values = WriterBuilder::new() + .terminator(csv::Terminator::Any(b'\n')) + .delimiter(b'\t') + .quote(b'"') + .escape(b'"') + .has_headers(false) + .from_writer(vec![]); + let mut columns: Vec<&str> = vec![ + database_settings.surrogate_key_column.as_ref(), + database_settings.dataset_id_column.as_ref(), + database_settings.dataset_path_column.as_ref(), + database_settings.dataset_landing_page_column.as_ref(), + database_settings.dataset_provider_column.as_ref(), + ]; + values.write_field(id.to_string())?; + values.write_field(abcd_data.dataset_id.clone())?; + values.write_field(abcd_data.dataset_path.clone())?; + values.write_field(abcd_data.landing_page.clone())?; + values.write_field(abcd_data.provider_name.clone())?; + for field in dataset_fields { + columns.push(&field.hash); + if let Some(value) = abcd_data.dataset.get(&field.name) { + values.write_field(value.to_string())?; + } else { + values.write_field("")?; + } + } + // terminate record + values.write_record(None::<&[u8]>)?; + + let copy_statement = format!( + "COPY {schema}.{table}(\"{columns}\") FROM STDIN WITH ({options})", + schema = database_settings.schema, + table = database_settings.temp_dataset_table, + columns = columns.join("\",\""), + options = POSTGRES_CSV_CONFIGURATION + ); + // dbg!(©_statement); + + let value_string = values.into_inner()?; + // dbg!(String::from_utf8_lossy(value_string.as_slice())); + + let statement = connection.prepare(©_statement)?; + statement.copy_in(&[], &mut value_string.as_slice())?; + + Ok(()) + } + + /// Insert the dataset units into the temporary schema + fn insert_units(&mut self, abcd_data: &AbcdResult, id: u32) -> Result<(), Error> { + let mut columns: Vec = vec![self.database_settings.surrogate_key_column.clone()]; + columns.extend(self.unit_fields.iter().map(|field| field.hash.clone())); + + let mut values = WriterBuilder::new() + .terminator(csv::Terminator::Any(b'\n')) + .delimiter(b'\t') + .quote(b'"') + .escape(b'"') + .has_headers(false) + .from_writer(vec![]); + + // append units one by one to tsv + for unit_data in &abcd_data.units { + values.write_field(&id.to_string())?; // put id first + + for field in &self.unit_fields { + if let Some(value) = unit_data.get(&field.name) { + values.write_field(value.to_string())?; + } else { + values.write_field("")?; + } + } + + values.write_record(None::<&[u8]>)?; // terminate record + } + + let copy_statement = format!( + "COPY {schema}.{table}(\"{columns}\") FROM STDIN WITH ({options})", + schema = self.database_settings.schema, + table = self.database_settings.temp_unit_table, + columns = columns.join("\",\""), + options = POSTGRES_CSV_CONFIGURATION + ); + + let statement = self.connection.prepare(©_statement)?; + // dbg!(&value_string); + statement.copy_in(&[], &mut values.into_inner()?.as_slice())?; + + Ok(()) + } +} + +/// An error enum for different storage sink errors. +#[derive(Debug, Fail)] +pub enum DatabaseSinkError { + /// This error occurs when there is an inconsistency between the ABCD dataset data and the sink's columns. + #[fail(display = "Inconsistent dataset columns: {}", 0)] + InconsistentDatasetColumns(String), + /// This error occurs when there is an inconsistency between the ABCD unit data and the sink's columns. + #[fail(display = "Inconsistent unit columns: {}", 0)] + InconsistentUnitColumns(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::settings::{DatabaseSettings, Settings}; + use crate::test_utils; + use postgres::rows::Rows; + use serde_json::json; + use std::collections::HashMap; + + #[test] + fn schema_creation_leads_to_required_tables() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([])); + + let database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + let tables = retrieve_ordered_table_names(&database_sink); + + assert_eq!( + tables, + sorted_vec(vec![ + database_settings.temp_dataset_table.clone(), + database_settings.temp_unit_table.clone(), + format!("{}_translation", database_settings.temp_dataset_table) + ]) + ); + } + + #[test] + fn schema_creation_leads_to_required_columns_in_dataset_table() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/Title", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/URI", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + ])); + + let database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + let dataset_table_columns = retrieve_ordered_table_column_names( + &database_sink, + &database_settings.temp_dataset_table, + ); + + let dataset_columns = extract_dataset_fields(&abcd_fields) + .iter() + .map(|field| field.hash.clone()) + .chain(vec![ + database_settings.surrogate_key_column.clone(), + "dataset_id".to_string(), + "dataset_landing_page".to_string(), + "dataset_path".to_string(), + "dataset_provider".to_string(), + ]) + .collect::>(); + + assert!(!dataset_columns.is_empty()); + assert_eq!(dataset_table_columns, sorted_vec(dataset_columns)); + } + + #[test] + fn schema_creation_leads_to_required_columns_in_unit_table() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "/DataSets/DataSet/Units/Unit/UnitID", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal", + "numeric": true, + "vatMandatory": true, + "gfbioMandatory": true, + "globalField": false, + "unit": "°" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal", + "numeric": true, + "vatMandatory": true, + "gfbioMandatory": true, + "globalField": false, + "unit": "°" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/SpatialDatum", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + } + ])); + + let database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + let dataset_table_columns = + retrieve_ordered_table_column_names(&database_sink, &database_settings.temp_unit_table); + + let unit_columns = extract_unit_fields(&abcd_fields) + .iter() + .map(|field| field.hash.clone()) + .chain(vec![database_settings.surrogate_key_column.clone()]) + .collect::>(); + + assert!(!unit_columns.is_empty()); + assert_eq!(dataset_table_columns, sorted_vec(unit_columns)); + } + + #[test] + fn translation_table_contains_entries() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/Title", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/URI", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + ])); + + let database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + let expected_translation_table_columns = vec![ + "/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name", + "/DataSets/DataSet/Metadata/Description/Representation/Title", + "/DataSets/DataSet/Metadata/Description/Representation/URI", + ]; + + let queried_translation_table_columns = + retrieve_translation_table_keys(&database_settings, &database_sink); + + assert_eq!( + sorted_vec(expected_translation_table_columns), + sorted_vec(queried_translation_table_columns) + ); + } + + #[test] + fn translation_table_entries_match_table_columns() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "/DataSets/DataSet/TechnicalContacts/TechnicalContact/Name", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/Title", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/UnitID", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + ])); + + let database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + let dataset_table_columns = retrieve_ordered_table_column_names( + &database_sink, + &database_settings.temp_dataset_table, + ); + let unit_table_columns = + retrieve_ordered_table_column_names(&database_sink, &database_settings.temp_unit_table); + + let translation_table_values = + retrieve_translation_table_values(&database_settings, &database_sink); + + for column_name in translation_table_values { + assert!( + dataset_table_columns.contains(&column_name) + || unit_table_columns.contains(&column_name) + ); + } + } + + #[test] + fn dataset_table_contains_entry_after_insert() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "DS_TEXT", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "DS_NUM", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "UNIT_TEXT", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + { + "name": "UNIT_NUM", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + ])); + + let mut database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + database_sink + .insert_dataset(&AbcdResult { + dataset_id: "TEST_ID".to_string(), + dataset_path: "TEST_PATH".to_string(), + landing_page: "TEST_LANDING_PAGE".to_string(), + provider_name: "TEST_PROVIDER".to_string(), + dataset: { + let mut values = HashMap::new(); + values.insert("DS_TEXT".into(), "FOOBAR".into()); + values.insert("DS_NUM".into(), 42.0.into()); + values + }, + units: vec![ + { + let mut values = HashMap::new(); + values.insert("UNIT_TEXT".into(), "FOO".into()); + values.insert("UNIT_NUM".into(), 13.0.into()); + values + }, + { + let mut values = HashMap::new(); + values.insert("UNIT_TEXT".into(), "BAR".into()); + values.insert("UNIT_NUM".into(), 37.0.into()); + values + }, + ], + }) + .unwrap(); + + assert_eq!( + 1, + number_of_entries(&database_sink, &database_settings.temp_dataset_table) + ); + assert_eq!( + 2, + number_of_entries(&database_sink, &database_settings.temp_unit_table) + ); + + let dataset_result = + retrieve_rows(&mut database_sink, &database_settings.temp_dataset_table); + + let dataset = dataset_result.get(0); + assert_eq!( + "TEST_ID", + dataset.get::<_, String>(database_settings.dataset_id_column.as_str()) + ); + assert_eq!( + "TEST_PATH", + dataset.get::<_, String>(database_settings.dataset_path_column.as_str()) + ); + assert_eq!( + "TEST_LANDING_PAGE", + dataset.get::<_, String>(database_settings.dataset_landing_page_column.as_str()) + ); + assert_eq!( + "TEST_PROVIDER", + dataset.get::<_, String>(database_settings.dataset_provider_column.as_str()) + ); + assert_eq!( + "FOOBAR", + dataset.get::<_, String>(Field::new("DS_TEXT").hash.as_str()) + ); + assert_eq!( + 42.0, + dataset.get::<_, f64>(Field::new("DS_NUM").hash.as_str()) + ); + + let unit_result = retrieve_rows(&mut database_sink, &database_settings.temp_unit_table); + + let unit1 = unit_result.get(0); + assert_eq!( + "FOO", + unit1.get::<_, String>(Field::new("UNIT_TEXT").hash.as_str()) + ); + assert_eq!( + 13.0, + unit1.get::<_, f64>(Field::new("UNIT_NUM").hash.as_str()) + ); + + let unit2 = unit_result.get(1); + assert_eq!( + "BAR", + unit2.get::<_, String>(Field::new("UNIT_TEXT").hash.as_str()) + ); + assert_eq!( + 37.0, + unit2.get::<_, f64>(Field::new("UNIT_NUM").hash.as_str()) + ); + } + + #[test] + fn second_insert_of_same_dataset_does_not_lead_to_second_entry_in_dataset_table() { + let database_settings = retrieve_settings_from_file_and_override_schema(); + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "DS_TEXT", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "DS_NUM", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "UNIT_TEXT", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + { + "name": "UNIT_NUM", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + ])); + + let mut database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + database_sink + .insert_dataset(&AbcdResult { + dataset_id: "TEST_ID".to_string(), + dataset_path: "TEST_PATH".to_string(), + landing_page: "TEST_LANDING_PAGE".to_string(), + provider_name: "TEST_PROVIDER".to_string(), + dataset: { + let mut values = HashMap::new(); + values.insert("DS_TEXT".into(), "FOOBAR".into()); + values.insert("DS_NUM".into(), 42.0.into()); + values + }, + units: vec![{ + let mut values = HashMap::new(); + values.insert("UNIT_TEXT".into(), "FOO".into()); + values.insert("UNIT_NUM".into(), 13.0.into()); + values + }], + }) + .unwrap(); + + database_sink + .insert_dataset(&AbcdResult { + dataset_id: "TEST_ID".to_string(), + dataset_path: "TEST_PATH".to_string(), + landing_page: "TEST_LANDING_PAGE".to_string(), + provider_name: "TEST_PROVIDER".to_string(), + dataset: { + let mut values = HashMap::new(); + values.insert("DS_TEXT".into(), "FOOBAR".into()); + values.insert("DS_NUM".into(), 42.0.into()); + values + }, + units: vec![{ + let mut values = HashMap::new(); + values.insert("UNIT_TEXT".into(), "BAR".into()); + values.insert("UNIT_NUM".into(), 37.0.into()); + values + }], + }) + .unwrap(); + + assert_eq!( + 1, + number_of_entries(&database_sink, &database_settings.temp_dataset_table) + ); + assert_eq!( + 2, + number_of_entries(&database_sink, &database_settings.temp_unit_table) + ); + } + + #[test] + fn correct_tables_after_schema_migration() { + let mut database_settings = retrieve_settings_from_file_and_override_schema(); + database_settings.unit_indexed_columns = vec![]; + + let abcd_fields = create_abcd_fields_from_json(&json!([])); + + let mut database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + database_sink + .insert_dataset(&AbcdResult { + dataset_id: "TEST_ID".to_string(), + dataset_path: "TEST_PATH".to_string(), + landing_page: "TEST_LANDING_PAGE".to_string(), + provider_name: "TEST_PROVIDER".to_string(), + dataset: Default::default(), + units: vec![], + }) + .unwrap(); + + database_sink.migrate_schema().unwrap(); + + let tables = retrieve_ordered_table_names(&database_sink); + + assert_eq!( + tables, + sorted_vec(vec![ + database_settings.dataset_table.clone(), + database_settings.unit_table.clone(), + format!("{}_translation", database_settings.dataset_table), + database_settings.listing_view.clone(), + ]) + ); + } + + #[test] + fn listing_view_contains_entry_after_migration() { + let mut database_settings = retrieve_settings_from_file_and_override_schema(); + database_settings.unit_indexed_columns = vec![]; + + let abcd_fields = create_abcd_fields_from_json(&json!([ + { + "name": "/DataSets/DataSet/Metadata/Description/Representation/Title", + "numeric": false, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": true, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + { + "name": "/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal", + "numeric": true, + "vatMandatory": false, + "gfbioMandatory": true, + "globalField": false, + "unit": "" + }, + ])); + + let mut database_sink = DatabaseSink::new(&database_settings, &abcd_fields).unwrap(); + + database_sink + .insert_dataset(&AbcdResult { + dataset_id: "TEST_ID".to_string(), + dataset_path: "TEST_PATH".to_string(), + landing_page: "TEST_LANDING_PAGE".to_string(), + provider_name: "TEST_PROVIDER".to_string(), + dataset: { + let mut values = HashMap::new(); + values.insert("/DataSets/DataSet/Metadata/Description/Representation/Title".into(), "FOOBAR".into()); + values + }, + units: vec![ + { + let mut values = HashMap::new(); + values.insert("/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LatitudeDecimal".into(), 10.0.into()); + values.insert("/DataSets/DataSet/Units/Unit/Gathering/SiteCoordinateSets/SiteCoordinates/CoordinatesLatLong/LongitudeDecimal".into(), 20.0.into()); + values + }, + ], + }) + .unwrap(); + + database_sink.migrate_schema().unwrap(); + + retrieve_ordered_table_column_names(&database_sink, &database_settings.listing_view); + + let rows = database_sink + .connection + .query( + &format!( + r#"SELECT * FROM pg_temp.{LISTING_VIEW}"#, + LISTING_VIEW = database_settings.listing_view + ), + &[], + ) + .unwrap(); + + assert_eq!(rows.len(), 1); + + let row = rows.iter().next().unwrap(); + assert_eq!(row.get::<_, String>("dataset"), "FOOBAR"); + assert_eq!(row.get::<_, String>("id"), "TEST_ID"); + assert_eq!(row.get::<_, String>("link"), "TEST_LANDING_PAGE"); + assert_eq!(row.get::<_, String>("provider"), "TEST_PROVIDER"); + assert!(row.get::<_, bool>("isGeoReferenced")); + } + + fn retrieve_rows(database_sink: &mut DatabaseSink, table_name: &str) -> Rows { + database_sink + .connection + .query( + &format!(r#"SELECT * FROM pg_temp.{TABLE}"#, TABLE = table_name,), + &[], + ) + .unwrap() + } + + fn number_of_entries(database_sink: &DatabaseSink, table_name: &str) -> i32 { + database_sink + .connection + .query( + &format!( + "select count(*)::integer as total from pg_temp.{}", + table_name + ), + &[], + ) + .unwrap() + .get(0) + .get("total") + } + + fn retrieve_translation_table_keys( + database_settings: &DatabaseSettings, + database_sink: &DatabaseSink, + ) -> Vec { + sorted_vec( + database_sink + .connection + .query( + &format!( + "select name from pg_temp.{}_translation;", + database_settings.temp_dataset_table, + ), + &[], + ) + .unwrap() + .iter() + .map(|row| row.get("name")) + .collect::>(), + ) + } + + fn retrieve_translation_table_values( + database_settings: &DatabaseSettings, + database_sink: &DatabaseSink, + ) -> Vec { + sorted_vec( + database_sink + .connection + .query( + &format!( + "select hash from pg_temp.{}_translation;", + database_settings.temp_dataset_table, + ), + &[], + ) + .unwrap() + .iter() + .map(|row| row.get("hash")) + .collect::>(), + ) + } + + fn sorted_vec(mut vec: Vec) -> Vec + where + T: Ord, + { + vec.sort(); + vec + } + + fn retrieve_ordered_table_names(database_sink: &DatabaseSink) -> Vec { + let mut tables = database_sink + .connection + .query( + r#" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = (SELECT nspname FROM pg_namespace WHERE oid = pg_my_temp_schema()) + ; + "#, + &[], + ) + .unwrap() + .iter() + .map(|row| row.get("table_name")) + .collect::>(); + + tables.sort(); + + tables + } + + fn retrieve_ordered_table_column_names( + database_sink: &DatabaseSink, + table_name: &str, + ) -> Vec { + let mut tables = database_sink + .connection + .query( + r#" + SELECT column_name + FROM information_schema.columns + WHERE table_schema = (SELECT nspname FROM pg_namespace WHERE oid = pg_my_temp_schema()) + AND table_name = $1 + ; + "#, + &[&table_name.to_string()], + ) + .unwrap() + .iter() + .map(|row| row.get("column_name")) + .collect::>(); + + tables.sort(); + + tables + } + + fn retrieve_settings_from_file_and_override_schema() -> DatabaseSettings { + let mut settings = Settings::new(None).unwrap().database; + settings.schema = "pg_temp".into(); + settings + } + + fn create_abcd_fields_from_json(json: &serde_json::Value) -> AbcdFields { + let fields_file = test_utils::create_temp_file(&json.to_string()); + + AbcdFields::from_path(&fields_file).expect("Unable to create ABCD Fields Spec") + } + + fn extract_dataset_fields(abcd_fields: &AbcdFields) -> Vec { + abcd_fields + .into_iter() + .filter(|field| field.global_field) + .map(|field| field.name.as_ref()) + .map(Field::new) + .collect() + } + + fn extract_unit_fields(abcd_fields: &AbcdFields) -> Vec { + abcd_fields + .into_iter() + .filter(|field| !field.global_field) + .map(|field| field.name.as_ref()) + .map(Field::new) + .collect() + } +} diff --git a/src/storage/field.rs b/src/storage/field.rs new file mode 100644 index 0000000..6d70776 --- /dev/null +++ b/src/storage/field.rs @@ -0,0 +1,33 @@ +use sha1::Sha1; + +pub struct Field { + pub name: String, + pub hash: String, +} + +impl Field { + pub fn new(name: &str) -> Self { + Self { + name: name.into(), + hash: Sha1::from(name.as_bytes()).digest().to_string(), + } + } +} + +impl From<&str> for Field { + fn from(name: &str) -> Self { + Self::new(name) + } +} + +impl From for Field { + fn from(name: String) -> Self { + Self::new(name.as_str()) + } +} + +impl From<&String> for Field { + fn from(name: &String) -> Self { + Self::new(name.as_str()) + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..7c4d2ab --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,7 @@ +mod database_sink; +mod field; +mod surrogate_key; + +pub use self::database_sink::DatabaseSink; +pub(self) use self::field::Field; +pub(self) use self::surrogate_key::{SurrogateKey, SurrogateKeyType}; diff --git a/src/storage/surrogate_key.rs b/src/storage/surrogate_key.rs new file mode 100644 index 0000000..f73a0c3 --- /dev/null +++ b/src/storage/surrogate_key.rs @@ -0,0 +1,67 @@ +use std::collections::hash_map::Entry::{Occupied, Vacant}; +use std::collections::HashMap; + +#[derive(Debug, PartialEq)] +pub struct SurrogateKey { + id_to_key: HashMap, + next_key: u32, +} + +#[derive(Debug, PartialEq)] +pub enum SurrogateKeyType { + New(u32), + Existing(u32), +} + +impl SurrogateKey { + pub fn new() -> Self { + Self { + id_to_key: Default::default(), + next_key: 1, + } + } + + pub fn for_id(&mut self, id: &str) -> SurrogateKeyType { + match self.id_to_key.entry(id.into()) { + Occupied(entry) => SurrogateKeyType::Existing(*entry.get()), + Vacant(entry) => { + let key = *entry.insert(self.next_key); + self.next_key += 1; + + SurrogateKeyType::New(key) + } + } + } +} + +impl Default for SurrogateKey { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_keys() { + let mut surrogate_key = SurrogateKey::new(); + + for i in 1..=5 { + assert_eq!( + SurrogateKeyType::New(i), + surrogate_key.for_id(&i.to_string()) + ); + } + } + + #[test] + fn existing_key() { + let mut surrogate_key = SurrogateKey::new(); + + assert_eq!(SurrogateKeyType::New(1), surrogate_key.for_id("foo")); + assert_eq!(SurrogateKeyType::Existing(1), surrogate_key.for_id("foo")); + assert_eq!(SurrogateKeyType::New(2), surrogate_key.for_id("bar")); + } +} diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs new file mode 100644 index 0000000..ddb06c8 --- /dev/null +++ b/src/test_utils/mod.rs @@ -0,0 +1,29 @@ +mod webserver; + +use std::io::Write; + +use tempfile::TempPath; + +pub use self::webserver::MockWebserver; + +pub fn create_temp_file(content: &str) -> TempPath { + create_temp_file_with_suffix("", content) +} + +pub fn create_temp_file_with_suffix(suffix: &str, content: &str) -> TempPath { + let mut file = tempfile::Builder::new() + .suffix(suffix) + .tempfile() + .expect("Unable to create test file."); + + write!(file, "{}", content).expect("Unable to write content to test file."); + + file.into_temp_path() +} + +pub fn create_empty_temp_file() -> TempPath { + tempfile::Builder::new() + .tempfile() + .expect("Unable to create test file.") + .into_temp_path() +} diff --git a/src/test_utils/webserver.rs b/src/test_utils/webserver.rs new file mode 100644 index 0000000..bac0d62 --- /dev/null +++ b/src/test_utils/webserver.rs @@ -0,0 +1,41 @@ +use mockito::{mock, Matcher, Mock}; + +pub struct MockWebserver { + _mock: Mock, +} + +impl MockWebserver { + pub fn from_text(path: &str, method: &str, text: &str) -> Self { + Self { + _mock: mock(method, path).with_body(text).create(), + } + } + + pub fn from_json(path: &str, method: &str, json_string: &str) -> Self { + Self { + _mock: mock(method, path) + .with_header("content-type", "application/json") + .with_body(json_string) + .create(), + } + } + + pub fn from_json_with_json_condition( + path: &str, + method: &str, + json_condition: &str, + json_result: &str, + ) -> Self { + Self { + _mock: mock(method, path) + .match_body(Matcher::JsonString(json_condition.to_string())) + .with_header("content-type", "application/json") + .with_body(json_result) + .create(), + } + } + + pub fn webserver_root_url(&self) -> String { + mockito::server_url() + } +} diff --git a/src/vat_type.rs b/src/vat_type.rs index 6a83b8d..890926a 100644 --- a/src/vat_type.rs +++ b/src/vat_type.rs @@ -1,8 +1,8 @@ -use std::fmt; use std::borrow::Cow; +use std::fmt; /// This enum represents the VAT data types. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum VatType { Textual(String), Numeric(f64), diff --git a/tests/learning/mockito.rs b/tests/learning/mockito.rs new file mode 100644 index 0000000..2106f29 --- /dev/null +++ b/tests/learning/mockito.rs @@ -0,0 +1,62 @@ +use mockito::{mock, Matcher}; +use reqwest::Client; +use std::collections::HashMap; + +#[test] +fn mockito_expect_body() { + let _webserver = mock("POST", Matcher::Any) + .match_body("FOOBAR") + .with_body("GOTCHA") + .create(); + + let client = Client::new(); + let mut response = client + .post(&mockito::server_url()) + .body("FOOBAR") + .send() + .unwrap(); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + assert_eq!(response.text().unwrap(), "GOTCHA"); +} + +#[test] +fn mockito_expect_json() { + const JSON_STRING: &str = r#"{"foo" : "bar"}"#; + + let _webserver = mock("POST", Matcher::Any) + .match_body(Matcher::JsonString(JSON_STRING.into())) + .with_body("GOTCHA") + .create(); + + let client = Client::new(); + let mut response = client + .post(&mockito::server_url()) + .body(JSON_STRING) + .send() + .unwrap(); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + assert_eq!(response.text().unwrap(), "GOTCHA"); +} + +#[test] +fn mockito_expect_json_from_map() { + let _webserver = mock("POST", Matcher::Any) + .match_body(Matcher::JsonString(r#"{"foo" : "bar"}"#.into())) + .with_body("GOTCHA") + .create(); + + let mut map = HashMap::new(); + map.insert("foo", "bar"); + + let client = Client::new(); + let mut response = client + .post(&mockito::server_url()) + .json(&map) + .send() + .unwrap(); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + assert_eq!(response.text().unwrap(), "GOTCHA"); +} diff --git a/tests/learning/mod.rs b/tests/learning/mod.rs new file mode 100644 index 0000000..4c9d171 --- /dev/null +++ b/tests/learning/mod.rs @@ -0,0 +1 @@ +pub mod mockito; diff --git a/tests/learning_tests.rs b/tests/learning_tests.rs new file mode 100644 index 0000000..a1a55fd --- /dev/null +++ b/tests/learning_tests.rs @@ -0,0 +1 @@ +pub mod learning;