Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle null values in data #636

Merged
merged 10 commits into from
Jan 31, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.41.0 [unreleased]

### Bug Fixes
1. [#636](https://github.com/influxdata/influxdb-client-python/pull/636): Handle missing data in data frames

## 1.40.0 [2024-01-30]

### Features
Expand Down
18 changes: 12 additions & 6 deletions influxdb_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,27 @@ async def _to_flux_record_stream_async(self, response, query_options=None, respo
return (await _parser.__aenter__()).generator_async()

def _to_data_frame_stream(self, data_frame_index, response, query_options=None,
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full):
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False):
"""
Parse HTTP response to DataFrame stream.

:param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.
"""
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes)
return _parser.generator()

async def _to_data_frame_stream_async(self, data_frame_index, response, query_options=None, response_metadata_mode:
FluxResponseMetadataMode = FluxResponseMetadataMode.full):
FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False):
"""
Parse HTTP response to DataFrame stream.

:param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.
"""
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes)
return (await _parser.__aenter__()).generator_async()

def _to_tables_parser(self, response, query_options, response_metadata_mode):
Expand All @@ -304,10 +308,12 @@ def _to_flux_record_stream_parser(self, query_options, response, response_metada
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
query_options=query_options, response_metadata_mode=response_metadata_mode)

def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode):
def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes):
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index, query_options=query_options,
response_metadata_mode=response_metadata_mode)
response_metadata_mode=response_metadata_mode,
use_extension_dtypes=use_extension_dtypes)

def _to_data_frames(self, _generator):
"""Parse stream of DataFrames into expected type."""
Expand Down
18 changes: 13 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FluxCsvParser(object):

def __init__(self, response, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None, query_options=None,
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> None:
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False) -> None:
"""
Initialize defaults.

Expand All @@ -75,6 +76,7 @@ def __init__(self, response, serialization_mode: FluxSerializationMode,
self.tables = TableList()
self._serialization_mode = serialization_mode
self._response_metadata_mode = response_metadata_mode
self._use_extension_dtypes = use_extension_dtypes
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = query_options.profilers if query_options is not None else None
Expand Down Expand Up @@ -211,7 +213,7 @@ def _parse_flux_response_row(self, metadata, csv):
pass
else:

# to int converions todo
# to int conversions todo
current_id = int(csv[2])
if metadata.table_id == -1:
metadata.table_id = current_id
Expand Down Expand Up @@ -253,7 +255,11 @@ def _prepare_data_frame(self):
_temp_df = _temp_df.set_index(self._data_frame_index)

# Append data
return pd.concat([self._data_frame.astype(_temp_df.dtypes), _temp_df])
df = pd.concat([self._data_frame.astype(_temp_df.dtypes), _temp_df])

if self._use_extension_dtypes:
return df.convert_dtypes()
return df

def parse_record(self, table_index, table, csv):
"""Parse one record."""
Expand All @@ -273,8 +279,10 @@ def _to_value(self, str_val, column):
default_value = column.default_value
if default_value == '' or default_value is None:
if self._serialization_mode is FluxSerializationMode.dataFrame:
from ..extras import np
return self._to_value(np.nan, column)
if self._use_extension_dtypes:
from ..extras import pd
return pd.NA
return None
return None
return self._to_value(default_value, column)

Expand Down
22 changes: 18 additions & 4 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
async_req=False, _preload_content=False, _return_http_data_only=False)
return self._to_flux_record_stream(response, query_options=self._get_query_options())

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
use_extension_dtypes: bool = False):
"""
Execute synchronous Flux query and return Pandas DataFrame.

Expand All @@ -234,6 +235,11 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
If not specified the default value from ``InfluxDBClient.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`~DataFrame` or :class:`~List[DataFrame]`

.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -250,10 +256,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
- https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
- https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/
""" # noqa: E501
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params,
use_extension_dtypes=use_extension_dtypes)
return self._to_data_frames(_generator)

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
use_extension_dtypes: bool = False):
"""
Execute synchronous Flux query and return stream of Pandas DataFrame as a :class:`~Generator[DataFrame]`.

Expand All @@ -265,6 +273,11 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
If not specified the default value from ``InfluxDBClient.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`~Generator[DataFrame]`

.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -289,7 +302,8 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

return self._to_data_frame_stream(data_frame_index=data_frame_index,
response=response,
query_options=self._get_query_options())
query_options=self._get_query_options(),
use_extension_dtypes=use_extension_dtypes)

def __del__(self):
"""Close QueryAPI."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
]

extra_requires = [
'pandas>=0.25.3',
'pandas>=1.0.0',
'numpy'
]

Expand Down
Loading