Skip to content

Commit

Permalink
fix: serialize Pandas NaN values into LineProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Apr 2, 2024
1 parent a645ea9 commit 364da93
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
22 changes: 10 additions & 12 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def _itertuples(data_frame):


def _not_nan(x):
return x == x
from ...extras import pd
return not pd.isna(x)


def _any_not_nan(p, indexes):
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# When NaNs are present, the expression looks like this (split
# across two lines to satisfy the code-style checker)
#
# lambda p: f"""{measurement_name} {"" if math.isnan(p[1])
# lambda p: f"""{measurement_name} {"" if pd.isna(p[1])
# else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}"""
#
# When there's a NaN value in column a, we'll end up with a comma at the start of the
Expand Down Expand Up @@ -175,7 +176,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# This column is a tag column.
if null_columns.iloc[index]:
key_value = f"""{{
'' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else
'' if {val_format} == '' or pd.isna({val_format}) else
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
}}"""
else:
Expand All @@ -192,19 +193,16 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# field column has no nulls, we don't run the comma-removal
# regexp substitution step.
sep = '' if len(field_indexes) == 0 else ','
if issubclass(value.type, np.integer):
field_value = f"{sep}{key_format}={{{val_format}}}i"
elif issubclass(value.type, np.bool_):
field_value = f'{sep}{key_format}={{{val_format}}}'
elif issubclass(value.type, np.floating):
if issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or issubclass(value.type, np.bool_):
suffix = 'i' if issubclass(value.type, np.integer) else ''
if null_columns.iloc[index]:
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
field_value = f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
else:
field_value = f'{sep}{key_format}={{{val_format}}}'
field_value = f"{sep}{key_format}={{{val_format}}}{suffix}"
else:
if null_columns.iloc[index]:
field_value = f"""{{
'' if type({val_format}) == float and math.isnan({val_format}) else
'' if pd.isna({val_format}) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
Expand All @@ -229,7 +227,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
'_ESCAPE_KEY': _ESCAPE_KEY,
'_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys,
'math': math,
'pd': pd,
})

for k, v in dict(data_frame.dtypes).items():
Expand Down
26 changes: 26 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,32 @@ def test_write_object_field_nan(self):
self.assertEqual("measurement val=2i 1586046600000000000",
points[1])

def test_write_missing_values(self):
from influxdb_client.extras import pd

data_frame = pd.DataFrame({
"a_bool": [True, None, False],
"b_int": [None, 1, 2],
"c_float": [1.0, 2.0, None],
"d_str": ["a", "b", None],
})

data_frame['a_bool'] = data_frame['a_bool'].astype(pd.BooleanDtype())
data_frame['b_int'] = data_frame['b_int'].astype(pd.Int64Dtype())
data_frame['c_float'] = data_frame['c_float'].astype(pd.Float64Dtype())
data_frame['d_str'] = data_frame['d_str'].astype(pd.StringDtype())

print(data_frame)
points = data_frame_to_list_of_points(
data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measurement')

self.assertEqual(3, len(points))
self.assertEqual("measurement a_bool=True,c_float=1.0,d_str=\"a\" 0", points[0])
self.assertEqual("measurement b_int=1i,c_float=2.0,d_str=\"b\" 1", points[1])
self.assertEqual("measurement a_bool=False,b_int=2i 2", points[2])

def test_write_field_bool(self):
from influxdb_client.extras import pd

Expand Down

0 comments on commit 364da93

Please sign in to comment.