From 364da934328f5f72d4bc7791311fafd8248e033f Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 2 Apr 2024 14:16:52 +0200 Subject: [PATCH] fix: serialize Pandas NaN values into LineProtocol --- .../client/write/dataframe_serializer.py | 22 +++++++--------- tests/test_WriteApiDataFrame.py | 26 +++++++++++++++++++ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index 6121171f..c4dc0964 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -20,7 +20,8 @@ def _itertuples(data_frame): def _not_nan(x): - return x == x + from ...extras import pd + return not pd.isna(x) def _any_not_nan(p, indexes): @@ -77,7 +78,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # When NaNs are present, the expression looks like this (split # across two lines to satisfy the code-style checker) # - # lambda p: f"""{measurement_name} {"" if math.isnan(p[1]) + # lambda p: f"""{measurement_name} {"" if pd.isna(p[1]) # else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}""" # # When there's a NaN value in column a, we'll end up with a comma at the start of the @@ -175,7 +176,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # This column is a tag column. if null_columns.iloc[index]: key_value = f"""{{ - '' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else + '' if {val_format} == '' or pd.isna({val_format}) else f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}' }}""" else: @@ -192,19 +193,16 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # field column has no nulls, we don't run the comma-removal # regexp substitution step. sep = '' if len(field_indexes) == 0 else ',' - if issubclass(value.type, np.integer): - field_value = f"{sep}{key_format}={{{val_format}}}i" - elif issubclass(value.type, np.bool_): - field_value = f'{sep}{key_format}={{{val_format}}}' - elif issubclass(value.type, np.floating): + if issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or issubclass(value.type, np.bool_): + suffix = 'i' if issubclass(value.type, np.integer) else '' if null_columns.iloc[index]: - field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}""" + field_value = f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}""" else: - field_value = f'{sep}{key_format}={{{val_format}}}' + field_value = f"{sep}{key_format}={{{val_format}}}{suffix}" else: if null_columns.iloc[index]: field_value = f"""{{ - '' if type({val_format}) == float and math.isnan({val_format}) else + '' if pd.isna({val_format}) else f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"' }}""" else: @@ -229,7 +227,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING, 'keys': keys, - 'math': math, + 'pd': pd, }) for k, v in dict(data_frame.dtypes).items(): diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 6ea7a98b..1e1f0ad3 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -159,6 +159,32 @@ def test_write_object_field_nan(self): self.assertEqual("measurement val=2i 1586046600000000000", points[1]) + def test_write_missing_values(self): + from influxdb_client.extras import pd + + data_frame = pd.DataFrame({ + "a_bool": [True, None, False], + "b_int": [None, 1, 2], + "c_float": [1.0, 2.0, None], + "d_str": ["a", "b", None], + }) + + data_frame['a_bool'] = data_frame['a_bool'].astype(pd.BooleanDtype()) + data_frame['b_int'] = data_frame['b_int'].astype(pd.Int64Dtype()) + data_frame['c_float'] = data_frame['c_float'].astype(pd.Float64Dtype()) + data_frame['d_str'] = data_frame['d_str'].astype(pd.StringDtype()) + + print(data_frame) + points = data_frame_to_list_of_points( + data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='measurement') + + self.assertEqual(3, len(points)) + self.assertEqual("measurement a_bool=True,c_float=1.0,d_str=\"a\" 0", points[0]) + self.assertEqual("measurement b_int=1i,c_float=2.0,d_str=\"b\" 1", points[1]) + self.assertEqual("measurement a_bool=False,b_int=2i 2", points[2]) + def test_write_field_bool(self): from influxdb_client.extras import pd