diff --git a/spade_anomaly_detection/csv_data_loader.py b/spade_anomaly_detection/csv_data_loader.py index d0ae0bf..96f038f 100644 --- a/spade_anomaly_detection/csv_data_loader.py +++ b/spade_anomaly_detection/csv_data_loader.py @@ -30,9 +30,10 @@ """Implements a CSV data loader.""" import collections +import csv import dataclasses import os -from typing import Callable, Dict, Final, Mapping, Optional, Sequence, Tuple +from typing import Callable, Dict, Final, Mapping, Optional, Sequence, Tuple, cast from absl import logging from google.cloud import storage @@ -43,13 +44,18 @@ import tensorflow as tf -# Types are from spade_anomaly_detection/data_utils/feature_metadata.py +# Types are from //cloud/ml/research/data_utils/feature_metadata.py _FEATURES_TYPE: Final[str] = 'FLOAT64' +_SOURCE_LABEL_TYPE: Final[str] = 'STRING' +_SOURCE_LABEL_DEFAULT_VALUE: Final[str] = '-1' _LABEL_TYPE: Final[str] = 'INT64' # Setting the shuffle buffer size to 1M seems to be necessary to get the CSV # reader to provide a diversity of data to the model. _SHUFFLE_BUFFER_SIZE: Final[int] = 1_000_000 +_SPLIT_CHAR: Final[str] = ',' + +LabelColumnType = str | list[str] | int | list[int] | None def _get_header_from_input_file(inputs_file: str) -> str: @@ -156,7 +162,7 @@ def from_inputs_file( """ header = _get_header_from_input_file(inputs_file=inputs_file) header = header.replace('\n', '') - all_columns = header.split(',') + all_columns = header.split(_SPLIT_CHAR) if label_column_name not in all_columns: raise ValueError( f'Label column {label_column_name} not found in the header: {header}' @@ -166,7 +172,7 @@ def from_inputs_file( column_names_dict = collections.OrderedDict( zip(all_columns, features_types) ) - column_names_dict[label_column_name] = _LABEL_TYPE + column_names_dict[label_column_name] = _SOURCE_LABEL_DEFAULT_VALUE return ColumnNamesInfo( column_names_dict=column_names_dict, header=header, @@ -202,13 +208,41 @@ def __init__(self, runner_parameters: parameters.RunnerParameters): 'Data input GCS URI is not set in the runner parameters. Please set ' 'the `data_input_gcs_uri` field in the runner parameters.' ) - self.all_labels = [ + self._label_counts = None + self._last_read_metadata = None + + self.all_labels: Final[list[int] | list[str]] = [ self.runner_parameters.positive_data_value, self.runner_parameters.negative_data_value, self.runner_parameters.unlabeled_data_value, ] - self._label_counts = None - self._last_read_metadata = None + # Construct a label remap from string labels to integers. The table is not + # necessary for the case when the labels are all integers. But instead of + # checking if the labels are all integers, we construct the table and use + # it only if labels are strings. The table is lightweight so it does not + # add overhead. + str_labels: list[str] = [str(l) for l in self.all_labels] + # Construct a label remap from string labels to integers. + if self.runner_parameters.labels_are_strings: + orig_labels: list[int] = [ + self.convert_str_to_int(l) for l in self.all_labels + ] + else: + orig_labels: list[int] = cast(list[int], self.all_labels) + # Key is always a string. + orig_map: dict[str, int] = dict(zip(str_labels, orig_labels)) + # Add the empty string to the label remap table. `None` is not added because + # the Data Loader will default to '' if the label is None. + orig_map[''] = cast( + int, + self.convert_str_to_int(self.runner_parameters.unlabeled_data_value) + if self.runner_parameters.labels_are_strings + else self.runner_parameters.unlabeled_data_value, + ) + # Convert to the Tensorflow lookup table. + self._label_remap_table: Final[tf.lookup.StaticHashTable] = ( + self._get_label_remap_table(labels_mapping=orig_map) + ) @property def label_counts(self) -> Dict[int | str, int]: @@ -265,13 +299,17 @@ def get_inputs_metadata( column_names_info=column_names_info, ) + @classmethod def _get_filter_by_label_value_func( - self, - label_column_filter_value: int | list[int] | None, + cls, + label_column_filter_value: LabelColumnType, exclude_label_value: bool = False, ) -> Callable[[tf.Tensor, tf.Tensor], bool]: """Returns a function that filters a record based on the label column value. + This function will return a function that filters a record based on the + label column value whether it is a string or an integer. + Args: label_column_filter_value: The value of the label column to use as a filter. If None, all records are included. @@ -286,8 +324,27 @@ def _get_filter_by_label_value_func( """ def filter_func(features: tf.Tensor, label: tf.Tensor) -> bool: # pylint: disable=unused-argument - if not label_column_filter_value: + if label_column_filter_value is None: return True + if ( + isinstance(label, tf.Tensor) + and label.dtype == tf.dtypes.string + or isinstance(label, np.ndarray) + and label.dtype == np.str_ + or isinstance(label, list) + and isinstance(label[0], str) + ): + # If the label dtype is string, convert it to an integer dtype, + # *assuming* that the string is composed of only digits. + try: + label = tf.strings.to_number( + label, tf.dtypes.as_dtype(_LABEL_TYPE.lower()) + ) + except tf.errors.InvalidArgumentError as e: + logging.exception( + 'Failed to convert label %s to integer: %s', label, e + ) + raise e label_cast = tf.cast(label[0], tf.dtypes.as_dtype(_LABEL_TYPE.lower())) label_column_filter_value_cast = tf.cast( label_column_filter_value, label_cast.dtype @@ -300,12 +357,45 @@ def filter_func(features: tf.Tensor, label: tf.Tensor) -> bool: # pylint: disab return filter_func + @classmethod + def convert_str_to_int(cls, value: str) -> int: + """Converts a string integer label to an integer label.""" + if isinstance(value, str) and value.lstrip('-').isdigit(): + return int(value) + elif isinstance(value, int): + return value + else: + raise ValueError( + f'Label {value} of type {type(value)} is not a string integer.' + ) + + @classmethod + def _get_label_remap_table( + cls, labels_mapping: dict[str, int] + ) -> tf.lookup.StaticHashTable: + """Returns a label remap table that converts string labels to integers.""" + # The possible keys are '', '-1, '0', '1'. None is not included because the + # Data Loader will default to '' if the label is None. + keys_tensor = tf.constant( + list(labels_mapping.keys()), + dtype=tf.dtypes.as_dtype(_SOURCE_LABEL_TYPE.lower()), + ) + vals_tensor = tf.constant( + list(labels_mapping.values()), + dtype=tf.dtypes.as_dtype(_LABEL_TYPE.lower()), + ) + label_remap_table = tf.lookup.StaticHashTable( + tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor), + default_value=-1, + ) + return label_remap_table + def load_tf_dataset_from_csv( self, input_path: str, label_col_name: str, batch_size: Optional[int] = None, - label_column_filter_value: Optional[int | list[int]] = None, + label_column_filter_value: LabelColumnType = None, exclude_label_value: bool = False, ) -> tf.data.Dataset: """Convert multiple CSV files to a tf.data.Dataset. @@ -361,7 +451,7 @@ def load_tf_dataset_from_csv( column_defaults=column_defaults, label_name=label_col_name, select_columns=None, - field_delim=',', + field_delim=_SPLIT_CHAR, use_quote_delim=True, na_value='', header=True, @@ -382,6 +472,24 @@ def load_tf_dataset_from_csv( f'Dataset with prefix {self._last_read_metadata.location_prefix} not ' 'created.' ) + + def remap_label(label: str | tf.Tensor) -> int | tf.Tensor: + """Remaps the label to an integer.""" + if isinstance(label, str) or ( + isinstance(label, tf.Tensor) and label.dtype == tf.dtypes.string + ): + return self._label_remap_table.lookup(label) + return label + + # The Dataset can have labels of type int or str. Cast them to int. + dataset = dataset.map( + lambda features, label: (features, remap_label(label)), + num_parallel_calls=tf.data.AUTOTUNE, + deterministic=True, + ) + + # Filter the dataset by label column value. Filtering is applied after + # re-mapping the labels so that the labels are all integers. ds_filter_func = self._get_filter_by_label_value_func( label_column_filter_value=label_column_filter_value, exclude_label_value=exclude_label_value, @@ -394,6 +502,7 @@ def combine_features_dict_into_tensor( features: Mapping[str, tf.Tensor], label: tf.Tensor, ) -> Tuple[tf.Tensor, tf.Tensor]: + """Combines the features into a single tensor.""" feature_matrix = tf.squeeze(tf.stack(list(features.values()), axis=1)) feature_matrix = tf.reshape(feature_matrix, (-1,)) feature_matrix = tf.cast( @@ -420,33 +529,51 @@ def combine_features_dict_into_tensor( dataset = dataset.prefetch(tf.data.AUTOTUNE) # This Dataset was just created. Calculate the label distribution. + # Any string labels were already re-mapped to integers. So keys are always + # strings and values are always integers. self._label_counts = self.counts_by_label(dataset) logging.info('Label counts: %s', self._label_counts) return dataset - def counts_by_label( - self, - dataset: tf.data.Dataset, - ) -> Dict[int | str, int]: - """Counts the number of samples in each label class in the dataset.""" + def counts_by_label(self, dataset: tf.data.Dataset) -> Dict[int, int]: + """Counts the number of samples in each label class in the dataset. + + When this function is called, the labels in the Dataset have already been + re-mapped to integers. So all counting operations make this assumption. + + Args: + dataset: The dataset to count the labels in. + + Returns: + A dictionary of label class (as integer) to counts (as integer). + """ @tf.function def count_class( - counts: Dict[int, tf.Tensor], + counts: Dict[int, int], # Keys are always strings. batch: Tuple[tf.Tensor, tf.Tensor], - ) -> Dict[int, tf.Tensor]: + ) -> Dict[int, int]: _, labels = batch - new_counts = counts.copy() + # Keys are always strings. + new_counts: Dict[int, int] = counts.copy() for i in self.all_labels: - cc = tf.cast(labels == i, tf.int32) - if i in list(new_counts.keys()): - new_counts[i] += tf.reduce_sum(cc) + # This function is called after the Dataset is constructed and the + # labels are re-mapped to integers. So convert the label to an integer. + if isinstance(i, str): + i_for_compare = self.convert_str_to_int(i) + else: + i_for_compare = i + cc: tf.Tensor = tf.cast(labels == i_for_compare, tf.int32) + if i_for_compare in list(new_counts.keys()): + new_counts[i_for_compare] += tf.reduce_sum(cc) else: - new_counts[i] = tf.reduce_sum(cc) + new_counts[i_for_compare] = tf.reduce_sum(cc) return new_counts - initial_state = dict((i, 0) for i in self.all_labels) + initial_state = dict( + (self.convert_str_to_int(i), 0) for i in self.all_labels + ) counts = dataset.reduce( initial_state=initial_state, reduce_func=count_class ) @@ -477,12 +604,16 @@ def get_label_thresholds(self) -> Mapping[str, float]: ) positive_count = self._label_counts[ - self.runner_parameters.positive_data_value + self.convert_str_to_int(self.runner_parameters.positive_data_value) ] labeled_data_record_count = ( - self._label_counts[self.runner_parameters.positive_data_value] - + self._label_counts[self.runner_parameters.negative_data_value] + self._label_counts[ + self.convert_str_to_int(self.runner_parameters.positive_data_value) + ] + + self._label_counts[ + self.convert_str_to_int(self.runner_parameters.negative_data_value) + ] ) positive_threshold = 100 * (positive_count / labeled_data_record_count) @@ -503,6 +634,7 @@ def upload_dataframe_to_gcs( labels: np.ndarray, weights: Optional[np.ndarray] = None, pseudolabel_flags: Optional[np.ndarray] = None, + map_labels_to_bool: bool = False, ) -> None: """Uploads the dataframe to BigQuery, create or replace table. @@ -512,6 +644,8 @@ def upload_dataframe_to_gcs( labels: Numpy array of labels. weights: Optional numpy array of weights. pseudolabel_flags: Optional numpy array of pseudolabel flags. + map_labels_to_bool: If True, map labels to bool. This is useful for + uploading data to BigQuery or AutoML for further analysis. Returns: None. @@ -536,6 +670,9 @@ def upload_dataframe_to_gcs( column_names = list( self._last_read_metadata.column_names_info.column_names_dict.keys() ) + # Save a copy of the feature column names. + feature_column_names = column_names.copy() + feature_column_names.remove(self.runner_parameters.label_col_name) # If the weights are provided, add them to the column names and to the # combined data. @@ -566,13 +703,30 @@ def upload_dataframe_to_gcs( column_names.append(self.runner_parameters.label_col_name) complete_dataframe = pd.DataFrame(data=combined_data, columns=column_names) - - # Adjust label column type so that users can go straight to BigQuery or - # AutoML without having to adjust data. Both of these products require a - # boolean or string target column, not integer. - complete_dataframe[self.runner_parameters.label_col_name] = ( - complete_dataframe[self.runner_parameters.label_col_name].astype('bool') - ) + feature_column_dtypes_map = { + c: _FEATURES_TYPE.lower() for c in feature_column_names + } + column_dtypes_map = { + self.runner_parameters.label_col_name: ( + str if self.runner_parameters.labels_are_strings else int + ), + data_loader.WEIGHT_COLUMN_NAME: np.float64, + data_loader.PSEUDOLABEL_FLAG_COLUMN_NAME: np.int64, + } | feature_column_dtypes_map + complete_dataframe = complete_dataframe.astype(column_dtypes_map) + if map_labels_to_bool: + # Adjust label column type so that users can go straight to BigQuery or + # AutoML without having to adjust data. Both of these products require a + # boolean or string target column, not integer. + complete_dataframe[ + self.runner_parameters.label_col_name + ] = complete_dataframe[self.runner_parameters.label_col_name].map({ + self.runner_parameters.positive_data_value: True, + self.runner_parameters.negative_data_value: False, + }) + complete_dataframe[self.runner_parameters.label_col_name] = ( + complete_dataframe[self.runner_parameters.label_col_name].astype(bool) + ) # Adjust pseudolabel flag column type. if pseudolabel_flags is not None: @@ -586,10 +740,12 @@ def upload_dataframe_to_gcs( self.runner_parameters.data_output_gcs_uri, f'pseudo_labeled_batch_{batch}.csv', ) - with tf.io.gfile.GFile( - output_path, - 'w', - ) as f: - complete_dataframe.to_csv(f, index=False, header=True) + with tf.io.gfile.GFile(output_path, 'w') as f: + complete_dataframe.to_csv( + f, + index=False, + header=True, + quoting=csv.QUOTE_NONNUMERIC, + ) if self.runner_parameters.verbose: logging.info('Uploaded pseudo-labeled data to %s', output_path) diff --git a/spade_anomaly_detection/csv_data_loader_test.py b/spade_anomaly_detection/csv_data_loader_test.py index 4ba4202..575dcdf 100644 --- a/spade_anomaly_detection/csv_data_loader_test.py +++ b/spade_anomaly_detection/csv_data_loader_test.py @@ -35,6 +35,8 @@ from typing import Set, Tuple from unittest import mock +from absl import flags + from absl.testing import parameterized import numpy as np import pandas as pd @@ -44,14 +46,18 @@ import tensorflow_datasets as tfds import pytest +# Required for `self.create_tempdir` to work. +if not flags.FLAGS.is_parsed(): + flags.FLAGS.mark_as_parsed() # Test Config. _NUMBER_OF_DECIMALS_TO_ROUND = 2 +# pylint: disable=one-element-tuple,g-one-element-tuple def _dataset_to_set_of_nested_tuples( ds: tf.data.Dataset, -) -> Set[Tuple[Tuple[float, ...], Tuple[float]]]: # pylint: disable=g-one-element-tuple +) -> Set[Tuple[Tuple[float, ...], Tuple[float]]]: """Helper to convert a dataset to a tuple of tuples of tuples for tests.""" ds_list = list(ds.as_numpy_iterator()) new_ds_list = [] @@ -77,6 +83,9 @@ def _dataset_to_set_of_nested_tuples( return set(new_ds_list) +# pylint: enable=one-element-tuple,g-one-element-tuple + + @dataclasses.dataclass(frozen=True) class FakeBlob: """Represents a fake GCS blob to be returned by bucket.list_blobs. @@ -102,7 +111,13 @@ def setUp(self): self.storage_client_mock.return_value.bucket.return_value = self.bucket_mock self.header = ["x1", "x2", "y"] - self.data1 = [[0.6, 0.2, -1], [0.1, 0.8, 0], [0.6, 0.9, 1]] + self.data1 = [ + [0.6, 0.2, "-1"], + [0.1, 0.8, "0"], + [0.6, 0.9, "1"], + [0.6, 0.7, ""], + [0.6, 0.3, None], + ] self.data1_df = pd.DataFrame(data=self.data1, columns=self.header) self.csv_file1 = "/dir1/data1.csv" self.csv_file1_content = self.data1_df.to_csv(header=True, index=False) @@ -138,7 +153,7 @@ def test_parse_gcs_uri_returns_bucket_name_prefix_and_suffix( @parameterized.named_parameters( ("incorrect_folder", "bucket/dir/"), - ("too_many_wildcards", "gs://bucket/*/file*.csv") + ("too_many_wildcards", "gs://bucket/*/file*.csv"), ) def test_parse_gcs_uri_incorrect_uri_raises(self, gcs_uri): with self.assertRaises(ValueError): @@ -184,7 +199,7 @@ def test_column_names_info_from_inputs_file_returns_column_names_info(self): header="x1,x2,y", label_column_name="y", column_names_dict=collections.OrderedDict( - [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", "INT64")] + [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", "-1")] ), num_features=2, ) @@ -197,41 +212,75 @@ def setUp(self): super().setUp() self.header = ["x1", "x2", "y"] self.dir = "dir1/" - self.data1 = [[0.6, 0.2, -1], [0.1, 0.8, 0], [0.6, 0.9, 1]] + self.data1 = [ + [0.6, 0.2, "-1"], + [0.1, 0.8, "0"], + [0.6, 0.9, "1"], + [0.6, 0.7, ""], + ] self.data1_df = pd.DataFrame(data=self.data1, columns=self.header) self.csv_file1 = f"{self.dir}data1.csv" self.csv_file1_content = self.data1_df.to_csv(header=True, index=False) - self.data2 = [[0.6, 0.7, 1], [0.6, 0.5, 0], [0.6, 0.9, 1], [0.6, 0.2, 1]] + self.data2 = [ + [0.6, 0.7, "1"], + [0.6, 0.3, "0"], + [0.6, 0.4, ""], + [0.6, 0.9, "1"], + [0.6, 0.2, "1"], + ] self.data2_df = pd.DataFrame(data=self.data2, columns=self.header) self.csv_file2 = f"{self.dir}data2.csv" self.csv_file2_content = self.data2_df.to_csv(header=True, index=False) self.data_df = pd.concat([self.data1_df, self.data2_df]) - self.data_df = self.data_df.astype({"y": "bool"}) + # self.data_df = self.data_df.astype({"y": "str"}) + + def test_get_label_remap_table(self): + label_mapping = {"-1": -1, "0": 0, "1": 1, "": -1} + remap_table = csv_data_loader.CsvDataLoader._get_label_remap_table( + label_mapping + ) + + with self.subTest(msg="check_size"): + # 3 original labels and 1 label for "". + self.assertEqual(remap_table.size(), 3 + 1) + + with self.subTest(msg="check_contents"): + remap_keys, remap_values = remap_table.export() + self.assertAllInSet(remap_keys, {b"-1", b"0", b"1", b""}) + self.assertAllInSet(remap_values, {-1, 0, 1}) @parameterized.named_parameters( dict( - testcase_name="no_label_value_no_exclude", + testcase_name="no_string_label_value_no_exclude", label_column_filter_value=None, exclude_label_value=False, - inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + inputs=[ + ([0.6, 0.2], ["-1"]), + ([0.1, 0.8], ["0"]), + ([0.6, 0.9], ["1"]), + ], expected=[True, True, True], ), dict( - testcase_name="positive_label_value_no_exclude", + testcase_name="positive_string_label_value_no_exclude", label_column_filter_value=1, exclude_label_value=False, - inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + inputs=[ + ([0.6, 0.2], ["-1"]), + ([0.1, 0.8], ["0"]), + ([0.6, 0.9], ["1"]), + ], expected=[False, False, True], ), dict( - testcase_name="positive_label_value_exclude", + testcase_name="positive_int_label_value_exclude", label_column_filter_value=1, exclude_label_value=True, inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], expected=[True, True, False], ), dict( - testcase_name="pos_and_0_label_value_no_exclude", + testcase_name="pos_and_0__int_label_value_no_exclude", label_column_filter_value=[0, 1], exclude_label_value=False, inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], @@ -241,22 +290,7 @@ def setUp(self): def test_get_filter_by_label_value_func( self, label_column_filter_value, exclude_label_value, inputs, expected ): - runner_parameters = parameters.RunnerParameters( - train_setting="PNU", - input_bigquery_table_path=None, - data_input_gcs_uri="gs://bucket/input_path", - output_gcs_uri="gs://bucket/output_path", - label_col_name="y", - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, - positive_threshold=5, - negative_threshold=95, - ) - instance = csv_data_loader.CsvDataLoader( - runner_parameters=runner_parameters - ) - filter_func = instance._get_filter_by_label_value_func( + filter_func = csv_data_loader.CsvDataLoader._get_filter_by_label_value_func( label_column_filter_value=label_column_filter_value, exclude_label_value=exclude_label_value, ) @@ -265,13 +299,75 @@ def test_get_filter_by_label_value_func( got = filter_func(input_f, input_l) self.assertEqual(keep, got) + def test_counts_by_label_returns_expected_counts(self): + dataset = tf.data.Dataset.from_tensor_slices(( + tf.constant( + [ + [0.6, 0.2], + [0.1, 0.8], + [0.6, 0.9], + [0.6, 0.7], + [0.6, 0.5], + [0.6, 0.9], + [0.6, 0.2], + ], + dtype=tf.float64, + ), + tf.constant([[-1], [0], [1], [1], [0], [1], [1]], dtype=tf.int64), + )) + data_loader = csv_data_loader.CsvDataLoader( + runner_parameters=parameters.RunnerParameters( + train_setting="PNU", + input_bigquery_table_path=None, + data_input_gcs_uri="gs://bucket/input_path", + output_gcs_uri="gs://bucket/output_path", + label_col_name="y", + positive_data_value="1", + negative_data_value="0", + unlabeled_data_value="-1", + labels_are_strings=True, + positive_threshold=5, + negative_threshold=95, + verbose=True, + ) + ) + counts = data_loader.counts_by_label(dataset) + expected_counts = { + -1: 1, + 0: 2, + 1: 4, + } + self.assertDictEqual(counts, expected_counts) + # Test the creation of a Dataset from CSV files. Only tests batch_size=1. - @pytest.mark.skip(reason="create_tempdir is broken in pytest") + @parameterized.named_parameters( + ( + "labels_are_strings", + "1", + "0", + "-1", + True, + ), + ( + "labels_are_ints", + 1, + 0, + -1, + False, + ), + ) @mock.patch.object(csv_data_loader, "_list_files", autospec=True) @mock.patch.object(csv_data_loader, "_parse_gcs_uri", autospec=True) @mock.patch.object(tf.io.gfile.GFile, "readline", autospec=True) def test_load_tf_dataset_from_csv_returns_expected_dataset( - self, mock_readline, mock_parse_gcs_uri, mock_file_reader + self, + positive_data_value, + negative_data_value, + unlabeled_data_value, + labels_are_strings, + mock_readline, + mock_parse_gcs_uri, + mock_file_reader, ): mock_readline.return_value = ",".join(self.header) tmp_dir = self.create_tempdir("tmp") @@ -301,9 +397,10 @@ def test_load_tf_dataset_from_csv_returns_expected_dataset( data_input_gcs_uri=input_path, output_gcs_uri=f"{input_path}/output", label_col_name="y", - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, + positive_data_value=positive_data_value, + negative_data_value=negative_data_value, + unlabeled_data_value=unlabeled_data_value, + labels_are_strings=labels_are_strings, positive_threshold=5, negative_threshold=95, verbose=True, @@ -320,21 +417,31 @@ def test_load_tf_dataset_from_csv_returns_expected_dataset( exclude_label_value=False, ) - expected_dataset = tf.data.Dataset.from_tensor_slices(( - tf.constant( - [ - [0.6, 0.2], - [0.1, 0.8], - [0.6, 0.9], - [0.6, 0.7], - [0.6, 0.5], - [0.6, 0.9], - [0.6, 0.2], - ], - dtype=tf.float64, + # [0.6, 0.2, "-1"], [0.1, 0.8, "0"], [0.6, 0.9, "1"], [0.6, 0.7, ""] + # [0.6, 0.7, "1"], [0.6, 0.3, "0"], [0.6, 0.4, ""], [0.6, 0.9, "1"], + # [0.6, 0.2, "1"]] + + expected_dataset = tf.data.Dataset.from_tensor_slices( + ( + tf.constant( + [ + [0.6, 0.2], + [0.1, 0.8], + [0.6, 0.9], + [0.6, 0.7], # blank label. + [0.6, 0.7], + [0.6, 0.3], + [0.6, 0.4], # blank label. + [0.6, 0.9], + [0.6, 0.2], + ], + dtype=tf.float64, + ), + tf.constant( + [[-1], [0], [1], [-1], [1], [0], [-1], [1], [1]], dtype=tf.int64 + ), ), - tf.constant([[-1], [0], [1], [1], [0], [1], [1]], dtype=tf.float64), - )) + ) expected_element_spec = ( tf.TensorSpec(shape=(None, None), dtype=tf.float64), # features @@ -343,10 +450,11 @@ def test_load_tf_dataset_from_csv_returns_expected_dataset( with self.subTest(msg="check_spec_equal"): self.assertTupleEqual(dataset.element_spec, expected_element_spec) + convert_str_to_int = csv_data_loader.CsvDataLoader.convert_str_to_int expected_counts = { - -1: 1, - 0: 2, - 1: 4, + convert_str_to_int(unlabeled_data_value): 3, + convert_str_to_int(negative_data_value): 2, + convert_str_to_int(positive_data_value): 4, } with self.subTest(msg="check_dataset_class_counts"): counts = data_loader.counts_by_label(dataset) @@ -366,8 +474,17 @@ def test_load_tf_dataset_from_csv_returns_expected_dataset( _dataset_to_set_of_nested_tuples(expected_dataset), ) - @pytest.mark.skip(reason="create_tempdir is broken in pytest") - def test_upload_dataframe_to_gcs(self): + @parameterized.named_parameters( + ("string_labels", "1", "0", "-1", "STRING"), + ("int_labels", 1, 0, -1, "INT64"), + ) + def test_upload_dataframe_to_gcs( + self, + positive_data_value, + negative_data_value, + unlabeled_data_value, + label_dtype, + ): tmp_dir = self.create_tempdir("tmp") output_dir = os.path.join(tmp_dir.full_path, self.dir, "output_path") runner_parameters = parameters.RunnerParameters( @@ -376,9 +493,10 @@ def test_upload_dataframe_to_gcs(self): data_input_gcs_uri="gs://bucket/input_path", output_gcs_uri="gs://bucket/model_path", label_col_name="y", - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, + positive_data_value=positive_data_value, + negative_data_value=negative_data_value, + unlabeled_data_value=unlabeled_data_value, + labels_are_strings=label_dtype == "STRING", positive_threshold=5, negative_threshold=95, data_output_gcs_uri=output_dir, @@ -391,7 +509,7 @@ def test_upload_dataframe_to_gcs(self): header="x1,x2,y", label_column_name="y", column_names_dict=collections.OrderedDict( - [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", "INT64")] + [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", label_dtype)] ), num_features=2, ) @@ -400,8 +518,23 @@ def test_upload_dataframe_to_gcs(self): location_prefix="doesnt_matter_for_uploader", files=["doesnt_matter_for_uploader"], ) - all_features = self.data_df[["x1", "x2"]].to_numpy() - all_labels = self.data_df["y"].to_numpy() + test_data_df = self.data_df.copy() + all_features = test_data_df[["x1", "x2"]].to_numpy() + all_labels = test_data_df["y"] + # Empty label backfill only happens when the DataLoader runs. Here, we + # backfill the empty labels to be all the `unlabeled_data_value`. + all_labels.replace( + "", + str(unlabeled_data_value) + if label_dtype == "STRING" + else unlabeled_data_value, + inplace=True, + ) + if label_dtype == "STRING": + all_labels = all_labels.astype(str) + else: + all_labels = all_labels.astype(int) + all_labels = all_labels.to_numpy() # Create 2 batches of features and labels. features1 = all_features[0:2] labels1 = all_labels[0:2] @@ -428,13 +561,15 @@ def test_upload_dataframe_to_gcs(self): np.repeat([0], len(features2)) .reshape(len(features2), 1) .astype(np.int64) - ) # Upload batch 1. + ) + # Upload batch 1. data_loader.upload_dataframe_to_gcs( batch=1, features=features1, labels=labels1, weights=weights1, pseudolabel_flags=flags1, + map_labels_to_bool=False, ) # Upload batch 2. data_loader.upload_dataframe_to_gcs( @@ -443,40 +578,60 @@ def test_upload_dataframe_to_gcs(self): labels=labels2, weights=weights2, pseudolabel_flags=flags2, + map_labels_to_bool=False, ) # Sorting means batch 1 file will be first. files_list = sorted(tf.io.gfile.listdir(output_dir)) self.assertLen(files_list, 2) col_names = ["x1", "x2", "alpha", "is_pseudolabel", "y"] + col_dtypes = [ + "float64", + "float64", + "float64", + "int64", + "str" if label_dtype == "STRING" else "int64", + ] + col_dtypes_map = dict(zip(col_names, col_dtypes)) expected_df1 = pd.concat( [ - self.data_df.iloc[0:2, 0:-1].reset_index(drop=True), + test_data_df.iloc[0:2, 0:-1].reset_index(drop=True), pd.DataFrame(weights1, columns=["alpha"]), pd.DataFrame(flags1, columns=["is_pseudolabel"]), - self.data_df.iloc[0:2, -1].reset_index(drop=True), + test_data_df.iloc[0:2, -1].reset_index(drop=True), ], names=col_names, ignore_index=True, axis=1, ) expected_df1.columns = col_names + expected_df1 = expected_df1.astype(col_dtypes_map) expected_df2 = pd.concat( [ - self.data_df.iloc[2:, 0:-1].reset_index(drop=True), + test_data_df.iloc[2:, 0:-1].reset_index(drop=True), pd.DataFrame(weights2, columns=["alpha"]), pd.DataFrame(flags2, columns=["is_pseudolabel"]), - self.data_df.iloc[2:, -1].reset_index(drop=True), + test_data_df.iloc[2:, -1].reset_index(drop=True), ], ignore_index=True, axis=1, ) expected_df2.columns = col_names + expected_df2 = expected_df2.astype(col_dtypes_map) expected_dfs = [expected_df1, expected_df2] for i, file_name in enumerate(files_list): with self.subTest(msg=f"file_{i}"): file_path = os.path.join(output_dir, file_name) with tf.io.gfile.GFile(file_path, "r") as f: - got_df = pd.read_csv(f, header=0) + got_df = pd.read_csv( + f, + header=0, + index_col=None, + dtype={ + runner_parameters.label_col_name: ( + "str" if label_dtype == "STRING" else "int64" + ) + }, + ) pd.testing.assert_frame_equal( got_df, expected_dfs[i], check_exact=False, check_like=True ) diff --git a/spade_anomaly_detection/data_loader_test.py b/spade_anomaly_detection/data_loader_test.py index 1b9ea11..2fd2f9b 100644 --- a/spade_anomaly_detection/data_loader_test.py +++ b/spade_anomaly_detection/data_loader_test.py @@ -62,6 +62,7 @@ def setUp(self): label_col_name='label', positive_data_value=5, negative_data_value=3, + labels_are_strings=False, unlabeled_data_value=-100, positive_threshold=5, negative_threshold=95, diff --git a/spade_anomaly_detection/occ_ensemble.py b/spade_anomaly_detection/occ_ensemble.py index ce59be7..15baa02 100644 --- a/spade_anomaly_detection/occ_ensemble.py +++ b/spade_anomaly_detection/occ_ensemble.py @@ -209,14 +209,16 @@ def _score_unlabeled_data( 'negative_indices': negative_indices } - def pseudo_label(self, - features: np.ndarray, - labels: np.ndarray, - positive_data_value: int, - negative_data_value: Optional[int], - unlabeled_data_value: int, - alpha: float = 1.0, - verbose: Optional[bool] = False) -> Sequence[np.ndarray]: + def pseudo_label( + self, + features: np.ndarray, + labels: np.ndarray, + positive_data_value: str | int, + negative_data_value: str | int | None, + unlabeled_data_value: str | int, + alpha: float = 1.0, + verbose: Optional[bool] = False, + ) -> Sequence[np.ndarray]: """Labels unlabeled data using the trained ensemble of OCCs. Args: @@ -270,13 +272,15 @@ def pseudo_label(self, negative_features, ], axis=0) - new_labels = np.concatenate([ - np.ones(len(new_positive_indices)), - np.zeros(len(new_negative_indices)), - np.ones(len(original_positive_idx)), - np.zeros(len(original_negative_idx)) - ], - axis=0) + new_labels = np.concatenate( + [ + np.full(len(new_positive_indices), positive_data_value), + np.full(len(new_negative_indices), negative_data_value), + np.full(len(original_positive_idx), positive_data_value), + np.full(len(original_negative_idx), negative_data_value), + ], + axis=0, + ) weights = np.concatenate([ np.repeat(alpha, len(new_positive_indices)), np.repeat(alpha, len(new_negative_indices)), diff --git a/spade_anomaly_detection/occ_ensemble_test.py b/spade_anomaly_detection/occ_ensemble_test.py index 4a7b673..72d2c03 100644 --- a/spade_anomaly_detection/occ_ensemble_test.py +++ b/spade_anomaly_detection/occ_ensemble_test.py @@ -85,21 +85,39 @@ def test_ensemble_training_no_error( msg='Model count in ensemble not equal to specified ensemble size.', ) - def test_score_unlabeled_data_no_error(self): + @parameterized.named_parameters( + ('labels_are_integers', False), + ('labels_are_strings', True), + ) + def test_score_unlabeled_data_no_error(self, labels_are_strings: bool): batches_per_occ = 1 positive_threshold = 2 negative_threshold = 90 - positive_data_value = 1 - negative_data_value = 0 - unlabeled_data_value = -1 alpha = 0.8 + if labels_are_strings: + positive_data_value = b'1' + negative_data_value = b'0' + unlabeled_data_value = b'-1' + else: + positive_data_value = 1 + negative_data_value = 0 + unlabeled_data_value = -1 + occ_train_dataset = data_loader.load_tf_dataset_from_csv( dataset_name='drug_train_pu_labeled', batch_size=None, - filter_label_value=unlabeled_data_value, + # Coerce `unlabeled_data_value` to int since the test dataset contains + # only integer labels. + filter_label_value=int(unlabeled_data_value), ) features_len = occ_train_dataset.cardinality().numpy() + if labels_are_strings: + # Treat the labels as strings for testing. Note that the test dataset + # contains only integer labels. + occ_train_dataset = occ_train_dataset.map( + lambda x, y: (x, tf.as_string(y)) + ) ensemble_obj = occ_ensemble.GmmEnsemble( n_components=1, @@ -114,18 +132,23 @@ def test_score_unlabeled_data_no_error(self): ) ensemble_obj.fit(occ_train_dataset, batches_per_occ) - features, labels = ( - data_loader.load_tf_dataset_from_csv( - dataset_name='drug_train_pu_labeled', - batch_size=500, - filter_label_value=None, - ) - .as_numpy_iterator() - .next() + occ_train_dataset = data_loader.load_tf_dataset_from_csv( + dataset_name='drug_train_pu_labeled', + batch_size=500, + filter_label_value=None, ) + if labels_are_strings: + # Treat the labels as strings for testing. Note that the test dataset + # contains only integer labels. + occ_train_dataset = occ_train_dataset.map( + lambda x, y: (x, tf.as_string(y)) + ) + features, labels = occ_train_dataset.as_numpy_iterator().next() label_count_before_labeling = len( - np.where((labels == 0) | (labels == 1))[0] + np.where( + (labels == negative_data_value) | (labels == positive_data_value) + )[0] ) updated_features, updated_labels, weights, pseudolabel_flags = ( @@ -140,7 +163,10 @@ def test_score_unlabeled_data_no_error(self): ) label_count_after_labeling = len( - np.where((updated_labels == 0) | (updated_labels == 1))[0] + np.where( + (updated_labels == negative_data_value) + | (updated_labels == positive_data_value) + )[0] ) new_label_count = label_count_after_labeling - label_count_before_labeling diff --git a/spade_anomaly_detection/parameters.py b/spade_anomaly_detection/parameters.py index ccdaf78..1d5dc0b 100644 --- a/spade_anomaly_detection/parameters.py +++ b/spade_anomaly_detection/parameters.py @@ -27,8 +27,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Holds dataclasses and enums leveraged by the SPADE algorithm. -""" +"""Holds dataclasses and enums leveraged by the SPADE algorithm.""" + import dataclasses import enum @@ -46,6 +46,19 @@ class TrainSetting(str, enum.Enum): PNU = 'PNU' +# An immutable mapping of label values to their corresponding integer values. +# This supports label values that the empty string. This value is mapped to -1, +# which is the value used to denote unlabeled data. +# None is not included in this mapping, because the Data Loader will default +# to the empty string if the label column is empty. +labels_mapping: Final[dict[str | None, int]] = { + '': -1, + '1': 1, + '0': 0, + '-1': -1, +} + + @dataclasses.dataclass class RunnerParameters: """Stores runner related parameters for helper functions in the module. @@ -71,6 +84,8 @@ class RunnerParameters: will be added to the end of the folder so that multiple runs of this won't overwrite previous runs. label_col_name: The name of the label column in the input BigQuery table. + labels_are_strings: Whether the labels in the input dataset are strings or + integers. positive_data_value: The value used in the label column to denote positive data - data points that are anomalous. negative_data_value: The value used in the label column to denote negative @@ -163,9 +178,10 @@ class RunnerParameters: data_input_gcs_uri: str output_gcs_uri: str label_col_name: str - positive_data_value: int - negative_data_value: int - unlabeled_data_value: int + positive_data_value: int | str + negative_data_value: int | str + unlabeled_data_value: int | str + labels_are_strings: bool = True positive_threshold: Optional[float] = None negative_threshold: Optional[float] = None ignore_columns: Optional[Sequence[str]] = None @@ -188,6 +204,8 @@ class RunnerParameters: verbose: bool = False def __post_init__(self): + """Validates the parameters and sets default values.""" + # Parameter checks. if not (self.input_bigquery_table_path or self.data_input_gcs_uri): raise ValueError( '`input_bigquery_table_path` or `data_input_gcs_uri` must be set.' @@ -212,3 +230,21 @@ def __post_init__(self): '`positive_data_value`, `negative_data_value` and' ' `unlabeled_data_value` must all be different from each other.' ) + if self.labels_are_strings and not self._check_labels_are_strings(): + raise TypeError( + '`labels_are_strings` must be True if `positive_data_value`, ' + '`negative_data_value` and `unlabeled_data_value` are strings.' + ) + # Adjust the labels if needed. + if not self.labels_are_strings: + self.positive_data_value = int(self.positive_data_value) + self.negative_data_value = int(self.negative_data_value) + self.unlabeled_data_value = int(self.unlabeled_data_value) + + def _check_labels_are_strings(self) -> bool: + """Returns True if the labels are strings.""" + return ( + isinstance(self.positive_data_value, str) + and isinstance(self.negative_data_value, str) + and isinstance(self.unlabeled_data_value, str) + ) diff --git a/spade_anomaly_detection/parameters_test.py b/spade_anomaly_detection/parameters_test.py index baccc79..0f8f5bf 100644 --- a/spade_anomaly_detection/parameters_test.py +++ b/spade_anomaly_detection/parameters_test.py @@ -46,9 +46,9 @@ def test_none_required_parameter_raises(self): data_input_gcs_uri='gs://some_bucket/some_data_input_path', output_gcs_uri='gs://some_bucket/some_path', label_col_name='y', - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, + positive_data_value='1', + negative_data_value='0', + unlabeled_data_value='-1', ) with self.subTest(name='no_input_sources_specified'): with self.assertRaises(ValueError): @@ -58,9 +58,9 @@ def test_none_required_parameter_raises(self): data_input_gcs_uri=None, output_gcs_uri='gs://some_bucket/some_path', label_col_name='y', - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, + positive_data_value='1', + negative_data_value='0', + unlabeled_data_value='-1', ) def test_equal_data_value_parameter_raises(self): @@ -71,9 +71,23 @@ def test_equal_data_value_parameter_raises(self): data_input_gcs_uri=None, output_gcs_uri='gs://some_bucket/some_path', label_col_name='y', + positive_data_value='1', + negative_data_value='0', + unlabeled_data_value='0', + ) + + def test_labels_are_strings_discrepancy_raises(self): + with self.assertRaises(TypeError): + _ = parameters.RunnerParameters( + train_setting=parameters.TrainSetting.PNU, + input_bigquery_table_path='some_project.some_dataset.some_table', + data_input_gcs_uri=None, + output_gcs_uri='gs://some_bucket/some_path', + label_col_name='y', + labels_are_strings=True, positive_data_value=1, negative_data_value=0, - unlabeled_data_value=0, + unlabeled_data_value=-1, ) diff --git a/spade_anomaly_detection/performance_test.py b/spade_anomaly_detection/performance_test.py index 77a82f1..c0c92c5 100644 --- a/spade_anomaly_detection/performance_test.py +++ b/spade_anomaly_detection/performance_test.py @@ -34,9 +34,9 @@ individual modules and functions. """ - from unittest import mock +from absl.testing import parameterized from spade_anomaly_detection import csv_data_loader from spade_anomaly_detection import data_loader from spade_anomaly_detection import parameters @@ -66,6 +66,7 @@ def setUp(self): positive_data_value=1, negative_data_value=0, unlabeled_data_value=-1, + labels_are_strings=False, positive_threshold=10, negative_threshold=90, test_label_col_name='y', @@ -240,7 +241,7 @@ def test_spade_auc_performance_pu_single_batch(self): self.assertAlmostEqual(auc, 0.9178, delta=0.02) -class PerformanceTestOnCSVData(tf.test.TestCase): +class PerformanceTestOnCSVData(tf.test.TestCase, parameterized.TestCase): def setUp(self): super().setUp() @@ -262,6 +263,7 @@ def setUp(self): positive_data_value=1, negative_data_value=0, unlabeled_data_value=-1, + labels_are_strings=False, positive_threshold=10, negative_threshold=90, test_label_col_name='y', @@ -399,8 +401,22 @@ def setUp(self): ) ) - def test_spade_auc_performance_pnu_single_batch(self): + @parameterized.named_parameters([ + ('labels_are_ints', False, 1, 0, -1), + ('labels_are_strings', True, '1', '0', '-1'), + ]) + def test_spade_auc_performance_pnu_single_batch( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): self.runner_parameters.train_setting = parameters.TrainSetting.PNU + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value self.runner_parameters.positive_threshold = 0.1 self.runner_parameters.negative_threshold = 95 self.runner_parameters.alpha = 0.1 @@ -433,8 +449,22 @@ def test_spade_auc_performance_pnu_single_batch(self): # performance seen on the ~580k row Coertype dataset in the PNU setting. self.assertAlmostEqual(auc, 0.9755, delta=0.02) - def test_spade_auc_performance_pu_single_batch(self): + @parameterized.named_parameters([ + ('labels_are_ints', False, 1, 0, -1), + ('labels_are_strings', True, '1', '0', '-1'), + ]) + def test_spade_auc_performance_pu_single_batch( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): self.runner_parameters.train_setting = parameters.TrainSetting.PU + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value self.runner_parameters.positive_threshold = 10 self.runner_parameters.negative_threshold = 50 self.runner_parameters.labeling_and_model_training_batch_size = ( diff --git a/spade_anomaly_detection/runner.py b/spade_anomaly_detection/runner.py index 33e5bbb..674f6ad 100644 --- a/spade_anomaly_detection/runner.py +++ b/spade_anomaly_detection/runner.py @@ -78,6 +78,9 @@ class Runner: test_data_loader: An instance of the data loader, useful for performing a number of operations such as loading, filtering, and uploading of bigquery or CSV test data. + int_positive_data_value: The integer value of the positive data value. + int_negative_data_value: The integer value of the negative data value. + int_unlabeled_data_value: The integer value of the unlabeled data value. """ def __init__(self, runner_parameters: parameters.RunnerParameters): @@ -148,6 +151,26 @@ def __init__(self, runner_parameters: parameters.RunnerParameters): else self.runner_parameters.negative_threshold ) + # After the runner is initialized and the Data Loaders are instantiated, + # set local positive, negative and unlabeled data values to int. The integer + # values are used for filtering the data. + # Leave the original values unchanged (whether string or int). + self.int_positive_data_value = ( + csv_data_loader.CsvDataLoader.convert_str_to_int( + self.runner_parameters.positive_data_value + ) + ) + self.int_negative_data_value = ( + csv_data_loader.CsvDataLoader.convert_str_to_int( + self.runner_parameters.negative_data_value + ) + ) + self.int_unlabeled_data_value = ( + csv_data_loader.CsvDataLoader.convert_str_to_int( + self.runner_parameters.unlabeled_data_value + ) + ) + def _get_table_statistics(self) -> Mapping[str, float]: """Gets the statistics for the input table.""" if self.data_format == DataFormat.BIGQUERY: @@ -162,16 +185,16 @@ def _get_table_statistics(self) -> Mapping[str, float]: input_path=self.runner_parameters.data_input_gcs_uri, label_col_name=self.runner_parameters.label_col_name, batch_size=1, - label_column_filter_value=[], ) input_table_statistics = stats_data_loader.get_label_thresholds() return input_table_statistics - def _get_record_count_based_on_labels(self, label_value: int) -> int: + def _get_record_count_based_on_labels(self, label_value: int | str) -> int: """Gets the number of records in the input table. Args: - label_value: The value of the label to use as the filter for records. + label_value: The value of the label to use as the filter for records. Can + be an int or a string. Returns: The count of records. @@ -200,7 +223,10 @@ def _get_record_count_based_on_labels(self, label_value: int) -> int: self.input_data_loader = cast( csv_data_loader.CsvDataLoader, self.input_data_loader ) - label_record_count = self.input_data_loader.label_counts[label_value] + # labe_counts is a dictionary of int label values to int record counts. + label_record_count = self.input_data_loader.label_counts[ + csv_data_loader.CsvDataLoader.convert_str_to_int(label_value) + ] return label_record_count def check_data_tables( @@ -307,8 +333,10 @@ def instantiate_and_fit_ensemble( batch_size=batch_size, # Train using negative labeled data and unlabeled data. label_column_filter_value=[ - self.runner_parameters.unlabeled_data_value, - self.runner_parameters.negative_data_value, + # Use the int values for filtering because filtering happens after + # the dataset labels are converted to int. + self.int_unlabeled_data_value, + self.int_negative_data_value, ], ) else: @@ -322,8 +350,10 @@ def instantiate_and_fit_ensemble( batch_size=batch_size, # Train using negative labeled data and unlabeled data. label_column_filter_value=[ - self.runner_parameters.unlabeled_data_value, - self.runner_parameters.negative_data_value, + # Use the int values for filtering because filtering happens after + # the dataset labels are converted to int. + self.int_unlabeled_data_value, + self.int_negative_data_value, ], ) @@ -500,7 +530,7 @@ def _get_test_data(self) -> tf.data.Dataset: # Remove any unlabeled samples that may be in the test set. unlabeled_sample_filter = ( f'{self.runner_parameters.test_label_col_name} != ' - f'{self.runner_parameters.unlabeled_data_value}' + f'{self.int_unlabeled_data_value}' ) if self.runner_parameters.where_statements is not None: unlabeled_sample_where_statements = list( @@ -532,15 +562,15 @@ def _get_test_data(self) -> tf.data.Dataset: label_col_name=self.runner_parameters.test_label_col_name, batch_size=None, label_column_filter_value=[ - self.runner_parameters.unlabeled_data_value, + self.int_unlabeled_data_value, ], exclude_label_value=True, ) test_label_counts = self.test_data_loader.label_counts logging.info('Test label counts: %s', test_label_counts) test_dataset_size = ( - test_label_counts[self.runner_parameters.positive_data_value] - + test_label_counts[self.runner_parameters.negative_data_value] + test_label_counts[self.int_positive_data_value] + + test_label_counts[self.int_negative_data_value] ) test_tf_dataset = test_tf_dataset.batch( tf.cast(test_dataset_size, tf.int64) @@ -577,7 +607,7 @@ def preprocess_train_test_split( if self.runner_parameters.train_setting == parameters.TrainSetting.PNU: ground_truth_label_indices = np.where( - labels != self.runner_parameters.unlabeled_data_value + labels != self.int_unlabeled_data_value )[0] label_count = int( len(ground_truth_label_indices) @@ -601,12 +631,8 @@ def preprocess_train_test_split( elif self.runner_parameters.train_setting == parameters.TrainSetting.PU: # Uses all ground truth negative labels and the correct proportion of # positive labels for testing. - positive_indices = np.where( - labels == self.runner_parameters.positive_data_value - )[0] - negative_indices = np.where( - labels == self.runner_parameters.negative_data_value - )[0] + positive_indices = np.where(labels == self.int_positive_data_value)[0] + negative_indices = np.where(labels == self.int_negative_data_value)[0] positive_label_count = int( len(positive_indices) @@ -655,8 +681,8 @@ def preprocess_train_test_split( self.test_y = np.array(test_y) if not ( - np.any(test_y == self.runner_parameters.positive_data_value) - and np.any(test_y == self.runner_parameters.negative_data_value) + np.any(test_y == self.int_positive_data_value) + and np.any(test_y == self.int_negative_data_value) ): raise ValueError( 'Positive and negative labels must be in the testing set. Please ' @@ -674,10 +700,8 @@ def preprocess_train_test_split( # Adjust the testing labels to values of 1 and 0 to align with the class # the supervised model is trained on. if self.test_y is not None: - self.test_y[self.test_y == - self.runner_parameters.positive_data_value] = 1 - self.test_y[self.test_y == - self.runner_parameters.negative_data_value] = 0 + self.test_y[self.test_y == self.int_positive_data_value] = 1 + self.test_y[self.test_y == self.int_negative_data_value] = 0 return (train_x, train_y) @@ -744,10 +768,10 @@ def run(self) -> None: logging.info('Total record count: %s', total_record_count) unlabeled_record_count = self._get_record_count_based_on_labels( - self.runner_parameters.unlabeled_data_value + self.int_unlabeled_data_value ) negative_record_count = self._get_record_count_based_on_labels( - self.runner_parameters.negative_data_value + self.int_negative_data_value ) self.check_data_tables( @@ -806,9 +830,9 @@ def run(self) -> None: ensemble_object.pseudo_label( features=train_x, labels=train_y, - positive_data_value=self.runner_parameters.positive_data_value, - negative_data_value=self.runner_parameters.negative_data_value, - unlabeled_data_value=self.runner_parameters.unlabeled_data_value, + positive_data_value=self.int_positive_data_value, + negative_data_value=self.int_negative_data_value, + unlabeled_data_value=self.int_unlabeled_data_value, alpha=self.runner_parameters.alpha, verbose=self.runner_parameters.verbose, ) diff --git a/spade_anomaly_detection/runner_test.py b/spade_anomaly_detection/runner_test.py index 7760395..052b9d9 100644 --- a/spade_anomaly_detection/runner_test.py +++ b/spade_anomaly_detection/runner_test.py @@ -27,14 +27,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for runner.""" +"""Tests for the runner for the SPADE algorithm.""" import re from typing import Optional, Pattern, Sequence, Union from unittest import mock +from absl.testing import parameterized import numpy as np -from parameterized import parameterized from spade_anomaly_detection import csv_data_loader from spade_anomaly_detection import data_loader from spade_anomaly_detection import occ_ensemble @@ -45,7 +45,7 @@ import tensorflow_decision_forests as tfdf -class RunnerBQTest(tf.test.TestCase): +class RunnerBQTest(tf.test.TestCase, parameterized.TestCase): def setUp(self): super().setUp() @@ -59,6 +59,7 @@ def setUp(self): positive_data_value=5, negative_data_value=3, unlabeled_data_value=-100, + labels_are_strings=False, positive_threshold=5, negative_threshold=95, ignore_columns=None, @@ -558,8 +559,8 @@ def test_preprocessing_pnu_no_error(self, mock_pseudo_label): self.runner_parameters.unlabeled_data_value, runner_object.test_y ) - @parameterized.expand( - [parameters.TrainSetting.PNU, parameters.TrainSetting.PU] + @parameterized.named_parameters( + [('pnu', parameters.TrainSetting.PNU), ('pu', parameters.TrainSetting.PU)] ) @mock.patch.object(occ_ensemble.GmmEnsemble, 'pseudo_label', autospec=True) def test_preprocessing_array_sizes_no_error( @@ -811,11 +812,11 @@ def test_evaluation_dataset_batch_training(self): * self.runner_parameters.test_dataset_holdout_fraction, ) - @parameterized.expand([ - (None, 99, 0.01, 99), - (5, None, 5, 98), - (None, None, 0.01, 98), - (5, 99, 5, 99), + @parameterized.named_parameters([ + ('no_pos_threshold', None, 99, 0.01, 99), + ('no_neg_threshold', 5, None, 5, 98), + ('no_thresholds', None, None, 0.01, 98), + ('all_thresholds', 5, 99, 5, 99), ]) @mock.patch.object( data_loader.DataLoader, 'get_label_thresholds', autospec=True @@ -852,7 +853,7 @@ def test_threshold_parameter_initialization_positive_threshold_set_no_error( ) -class RunnerCSVTest(tf.test.TestCase): +class RunnerCSVTest(tf.test.TestCase, parameterized.TestCase): def setUp(self): super().setUp() @@ -866,6 +867,7 @@ def setUp(self): positive_data_value=5, negative_data_value=3, unlabeled_data_value=-100, + labels_are_strings=False, positive_threshold=5, negative_threshold=95, ignore_columns=None, @@ -1030,7 +1032,276 @@ def _create_mock_datasets(self) -> None: ] self.mock_label_counts.return_value = self.label_counts - def test_runner_supervised_model_fit_with_csv_data(self): + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_runner_csv_data_loader_no_error( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + with self.subTest(name='StatsDataset'): + self.mock_load_tf_dataset_from_csv.assert_any_call( + mock.ANY, + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=1, + ) + + # Check the batch size here to ensure we are dividing the dataset into 5 + # shards. + with self.subTest(name='OccDataset'): + self.mock_load_tf_dataset_from_csv.assert_any_call( + mock.ANY, + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + # Verify that both negative and unlabeled samples are used. + label_column_filter_value=[ + runner_object.int_unlabeled_data_value, + runner_object.int_negative_data_value, + ], + # Verify that batch size is computed with both negative and unlabeled + # sample counts. + batch_size=int( + (self.unlabeled_examples + self.negative_examples) + // self.runner_parameters.ensemble_count + ), + ) + # Assert that the data loader is also called to fetch all records. + with self.subTest(name='InferenceAndSupervisedDataset'): + self.mock_load_tf_dataset_from_csv.assert_any_call( + mock.ANY, + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=self.all_examples, + ) + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_runner_supervised_model_fit( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + self.runner_parameters.alpha = 0.8 + self.runner_parameters.negative_threshold = 0 + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + supervised_model_actual_kwargs = ( + self.mock_supervised_model_train.call_args.kwargs + ) + + with self.subTest('NoUnlabeledData'): + self.assertNotIn( + runner_object.int_unlabeled_data_value, + supervised_model_actual_kwargs['labels'], + msg='Unlabeled data was used to fit the supervised model.', + ) + with self.subTest('LabelWeights'): + self.assertIn( + self.runner_parameters.alpha, + supervised_model_actual_kwargs['weights'], + ) + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_supervised_model_evaluation_no_error( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + runner_obj = runner.Runner(self.runner_parameters) + runner_obj.run() + + evaluate_arguments = self.mock_supervised_model_evaluate.call_args.kwargs + + with self.subTest(name='TestLabels'): + self.assertNotIn( + runner_obj.int_unlabeled_data_value, evaluate_arguments['y'] + ) + self.assertIn(1, evaluate_arguments['y']) + self.assertIn(0, evaluate_arguments['y']) + with self.subTest(name='FeaturesNotNull'): + self.assertIsNotNone(evaluate_arguments['x']) + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_proprocessing_inputs_supervised_model_train( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + mmock_train_feature_value = ( + self.mock_supervised_model_train.call_args.kwargs['labels'] + ) + + self.assertNotIn( + runner_object.int_unlabeled_data_value, mmock_train_feature_value + ) + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_upload_only_setting_true_no_error( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + self.runner_parameters.upload_only = True + self.runner_parameters.data_output_gcs_uri = ( + 'gs://output_bucket/output_pseudolabels_folder' + ) + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + with self.subTest('SupervisedModelNotCalled'): + self.mock_supervised_model_train.assert_not_called() + self.mock_supervised_model_evaluate.assert_not_called() + self.mock_supervised_model_save.assert_not_called() + + with self.subTest('CSVUploadCalled'): + self.mock_csv_upload.assert_called_once() + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_upload_only_setting_true_throw_error_no_gcs_uri( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + self.runner_parameters.upload_only = True + self.runner_parameters.output_bigquery_table_path = '' + self.runner_parameters.data_output_gcs_uri = '' + runner_object = runner.Runner(self.runner_parameters) + + with self.assertRaisesRegex( + ValueError, + r'output_bigquery_table_path or data_output_gcs_uri needs to be ' + r'specified in', + ): + runner_object.run() + + @parameterized.named_parameters([ + ('labels_are_strings_false', False, 5, 3, -100), + ('labels_are_strings_true', True, '5', '3', '-100'), + ]) + def test_csv_upload_only_false_no_error( + self, + labels_are_strings: bool, + positive_data_value: str | int, + negative_data_value: str | int, + unlabeled_data_value: str | int, + ): + self.runner_parameters.labels_are_strings = labels_are_strings + self.runner_parameters.positive_data_value = positive_data_value + self.runner_parameters.negative_data_value = negative_data_value + self.runner_parameters.unlabeled_data_value = unlabeled_data_value + self.runner_parameters.upload_only = False + self.runner_parameters.data_output_gcs_uri = ( + 'gs://output_bucket/output_pseudolabels_folder' + ) + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + with self.subTest('SupervisedModelCalled'): + self.mock_supervised_model_train.assert_called_once() + self.mock_supervised_model_evaluate.assert_called_once() + self.mock_supervised_model_save.assert_called_once() + + with self.subTest('CSVUploadCalled'): + self.mock_csv_upload.assert_called_once() + + def test_csv_runner_supervised_model_fit_with_csv_data_int_labels(self): + self.runner_parameters.labels_are_strings = False + self.runner_parameters.positive_data_value = 5 + self.runner_parameters.negative_data_value = 3 + self.runner_parameters.unlabeled_data_value = -100 + self.runner_parameters.alpha = 0.8 + self.runner_parameters.negative_threshold = 0 + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + supervised_model_actual_kwargs = ( + self.mock_supervised_model_train.call_args.kwargs + ) + + with self.subTest('NoUnlabeledData'): + self.assertNotIn( + runner_object.int_unlabeled_data_value, + supervised_model_actual_kwargs['labels'], + msg='Unlabeled data was used to fit the supervised model.', + ) + with self.subTest('LabelWeights'): + self.assertIn( + self.runner_parameters.alpha, + supervised_model_actual_kwargs['weights'], + ) + + with self.subTest('CSVUploadNotCalled'): + self.mock_csv_upload.assert_not_called() + + def test_csv_runner_supervised_model_fit_with_csv_data_string_labels(self): + self.runner_parameters.labels_are_strings = True + self.runner_parameters.positive_data_value = '5' + self.runner_parameters.negative_data_value = '3' + self.runner_parameters.unlabeled_data_value = '-100' self.runner_parameters.alpha = 0.8 self.runner_parameters.negative_threshold = 0 @@ -1053,6 +1324,9 @@ def test_runner_supervised_model_fit_with_csv_data(self): supervised_model_actual_kwargs['weights'], ) + with self.subTest('CSVUploadNotCalled'): + self.mock_csv_upload.assert_not_called() + if __name__ == '__main__': tf.test.main() diff --git a/spade_anomaly_detection/supervised_model_test.py b/spade_anomaly_detection/supervised_model_test.py index da617df..9b98113 100644 --- a/spade_anomaly_detection/supervised_model_test.py +++ b/spade_anomaly_detection/supervised_model_test.py @@ -29,34 +29,42 @@ """Tests for supervised models.""" + from unittest import mock +from absl.testing import parameterized from spade_anomaly_detection import data_loader from spade_anomaly_detection import supervised_model - import tensorflow as tf import tensorflow_decision_forests as tfdf -def _get_labeled_dataset() -> tf.data.Dataset: +def _get_labeled_dataset(labels_are_strings: bool = False) -> tf.data.Dataset: """Loads the thyroid_labeled dataset for model performance testing. + Args: + labels_are_strings: Whether the labels are strings or integers. + Returns: TensorFlow dataset with X, y split. """ features, labels = data_loader.load_dataframe('thyroid_labeled') - labeled_dataset = ( - tf.data.Dataset.from_tensor_slices((features, labels)) - .batch(100) - .as_numpy_iterator() - .next() - ) + original_dataset = tf.data.Dataset.from_tensor_slices((features, labels)) + if labels_are_strings: + # Mimic the behavior of the CsvDataLoader by converting the labels to + # strings. The string labels are converted back to integers in the + # Data loader. + # Treat the labels as strings for testing. Note that the test dataset + # contains only integer labels. + original_dataset = original_dataset.map(lambda x, y: (x, tf.as_string(y))) + + labeled_dataset = original_dataset.batch(100).as_numpy_iterator().next() return labeled_dataset -class SupervisedModelsTest(tf.test.TestCase): +class SupervisedModelsTest(tf.test.TestCase, parameterized.TestCase): def test_supervised_model_create_no_error(self): params = supervised_model.RandomForestParameters(max_depth=20) @@ -64,28 +72,51 @@ def test_supervised_model_create_no_error(self): self.assertEqual(model_instance.supervised_parameters.max_depth, 20) + @parameterized.named_parameters( + ('labels_are_integers', False), + ('labels_are_strings', True), + ) @mock.patch.object(tfdf.keras.RandomForestModel, 'fit', autospec=True) - def test_train_supervised_model_no_error(self, mock_fit): + def test_train_supervised_model_no_error( + self, labels_are_strings: bool, mock_fit: mock.Mock + ): params = supervised_model.RandomForestParameters() model = supervised_model.RandomForestModel(parameters=params) - features, labels = _get_labeled_dataset() + features, labels = _get_labeled_dataset(labels_are_strings) - model.train(features=features, labels=labels) + # The CsvDataLoader converts the labels to integers, while the BQDataLoader + # assumes that the labels are integers. So for this test, convert the labels + # to integers here. + model.train(features=features, labels=labels.astype(int)) mock_fit_actual_call_args = mock_fit.call_args.kwargs - expected_fit_call_args = {'x': features, 'y': labels, 'sample_weight': None} + # Convert the labels to integers for the expected call args. + expected_fit_call_args = { + 'x': features, + 'y': labels.astype(int), + 'sample_weight': None, + } self.assertDictEqual(mock_fit_actual_call_args, expected_fit_call_args) + @parameterized.named_parameters( + ('labels_are_integers', False), + ('labels_are_strings', True), + ) @mock.patch.object(tfdf.keras.RandomForestModel, 'save', autospec=True) - def test_model_saving_no_error(self, mock_tfdf_save): - job_dir = 'gs://test_bucket/test_folder/' + def test_model_saving_no_error( + self, labels_are_strings: bool, mock_tfdf_save: mock.Mock + ): + job_dir: str = 'gs://test_bucket/test_folder/' params = supervised_model.RandomForestParameters() model = supervised_model.RandomForestModel(parameters=params) - features, labels = _get_labeled_dataset() + features, labels = _get_labeled_dataset(labels_are_strings) - model.train(features=features, labels=labels) + # The CsvDataLoader converts the labels to integers, while the BQDataLoader + # assumes that the labels are integers. So for this test, convert the labels + # to integers here. + model.train(features=features, labels=labels.astype(int)) model.save(job_dir) mock_tfdf_save.assert_called_once_with(model.supervised_model, job_dir) diff --git a/spade_anomaly_detection/task.py b/spade_anomaly_detection/task.py index ca6452a..c47488e 100644 --- a/spade_anomaly_detection/task.py +++ b/spade_anomaly_detection/task.py @@ -104,21 +104,30 @@ help="The name of the label column in BigQuery.", ) -_POSITIVE_DATA_VALUE = flags.DEFINE_integer( +_LABELS_ARE_STRINGS = flags.DEFINE_bool( + "labels_are_strings", + default=True, + required=False, + help=( + "Set to True if the labels are strings. Otherwise the labels are ints." + ), +) + +_POSITIVE_DATA_VALUE = flags.DEFINE_string( "positive_data_value", default=None, required=True, help="The column value used to define an anomalous (positive) data point.", ) -_NEGATIVE_DATA_VALUE = flags.DEFINE_integer( +_NEGATIVE_DATA_VALUE = flags.DEFINE_string( "negative_data_value", default=None, required=True, help="The column value used to define a normal (negative) data point.", ) -_UNLABELED_DATA_VALUE = flags.DEFINE_integer( +_UNLABELED_DATA_VALUE = flags.DEFINE_string( "unlabeled_data_value", default=None, required=True, @@ -372,6 +381,7 @@ def main(argv: Sequence[str]) -> None: positive_data_value=_POSITIVE_DATA_VALUE.value, negative_data_value=_NEGATIVE_DATA_VALUE.value, unlabeled_data_value=_UNLABELED_DATA_VALUE.value, + labels_are_strings=_LABELS_ARE_STRINGS.value, positive_threshold=_POSITIVE_THRESHOLD.value, negative_threshold=_NEGATIVE_THRESHOLD.value, ignore_columns=_IGNORE_COLUMNS.value, @@ -401,5 +411,6 @@ def main(argv: Sequence[str]) -> None: try: app.run(main) except Exception as e: + logging.error(str(e)) logging.shutdown() - raise e + raise