From c41633eff23b6794c8042ebc6cd984b1c7de133a Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Fri, 24 Sep 2021 23:46:54 +0530 Subject: [PATCH 1/8] experimenting with delta-rs --- dask_deltatable/core.py | 293 ++++++++++----------------------------- dask_deltatable/utils.py | 135 ------------------ requirements.txt | 1 + tests/test_core.py | 15 +- 4 files changed, 86 insertions(+), 358 deletions(-) delete mode 100644 dask_deltatable/utils.py diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index ee142bb..05a7bd8 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -1,196 +1,36 @@ import json +import os import re +from sys import path import pyarrow.parquet as pq from dask.base import tokenize +from dask.config import collect from dask.dataframe.io import from_delayed from dask.dataframe.io.parquet.core import get_engine +from dask.dataframe.utils import check_matching_columns from dask.delayed import delayed +from deltalake import DeltaTable from fsspec.core import get_fs_token_paths from pyarrow import dataset as pa_ds -from .utils import ( - FASTPARQUET_CHECKPOINT_SCHEMA, - PYARROW_CHECKPOINT_SCHEMA, - schema_from_string, -) - -__all__ = ["read_delta_table"] - - -class DeltaTable: - """ - Core DeltaTable Read Algorithm - - Parameters - ---------- - path: str - path of Delta table directory - version: int, default 0 - DeltaTable Version, used for Time Travelling across the - different versions of the parquet datasets - columns: None or list(str) - Columns to load. If None, loads all. - engine : str, default 'auto' - Parquet reader library to use. Options include: 'auto', 'fastparquet', - 'pyarrow', 'pyarrow-dataset', and 'pyarrow-legacy'. Defaults to 'auto', - which selects the FastParquetEngine if fastparquet is installed (and - ArrowDatasetEngine otherwise). If 'pyarrow' or 'pyarrow-dataset' is - specified, the ArrowDatasetEngine (which leverages the pyarrow.dataset - API) will be used. If 'pyarrow-legacy' is specified, ArrowLegacyEngine - will be used (which leverages the pyarrow.parquet.ParquetDataset API). - NOTE: The 'pyarrow-legacy' option (ArrowLegacyEngine) is deprecated - for pyarrow>=5. - checkpoint: int, default None - DeltaTable protocol aggregates the json delta logs and creates parquet - checkpoint for every 10 versions.This will save some IO. - for example: - if you want to read 106th version of dataset, No need to read all - the all the 106 json files (which isIO heavy), instead read 100th - parquet checkpoint which contains all the logs of 100 versions in single - parquet file and now read the extra 6 json files to match the given - 106th version. - (so IO reduced from 106 to 7) - storage_options : dict, default None - Key/value pairs to be passed on to the file-system backend, if any. - """ +class DeltaTableWrapper(object): def __init__( - self, - path: str, - version: int = 0, - columns=None, - engine="auto", - checkpoint=None, - storage_options=None, - ): - self.path = str(path).rstrip("/") + self, path, version, columns, datetime=None, storage_options=None + ) -> None: + self.path = path self.version = version - self.pq_files = set() - self.delta_log_path = f"{self.path}/_delta_log" - self.fs, self.fs_token, _ = get_fs_token_paths( - path, storage_options=storage_options - ) - self.engine = engine self.columns = columns - self.checkpoint = ( - checkpoint if checkpoint is not None else self.get_checkpoint_id() - ) + self.datetime = datetime self.storage_options = storage_options - self.schema = None - - def get_checkpoint_id(self): - """ - if _last_checkpoint file exists, returns checkpoint_id else zero - """ - try: - last_checkpoint_version = json.loads( - self.fs.cat(f"{self.delta_log_path}/_last_checkpoint") - )["version"] - except FileNotFoundError: - last_checkpoint_version = 0 - return last_checkpoint_version - - def get_pq_files_from_checkpoint_parquet(self): - """ - use checkpoint_id to get logs from parquet files - """ - if self.checkpoint == 0: - return - checkpoint_path = ( - f"{self.delta_log_path}/{self.checkpoint:020}.checkpoint.parquet" - ) - if not self.fs.exists(checkpoint_path): - raise ValueError( - f"Parquet file with the given checkpoint {self.checkpoint} does not exists: " - f"File {checkpoint_path} not found" - ) - - # reason for this `if condition` was that FastParquetEngine seems to normalizes the json present in each column - # not sure whether there is a better solution for handling this . - # `https://fastparquet.readthedocs.io/en/latest/details.html#reading-nested-schema` - if self.engine.__name__ == "ArrowDatasetEngine": - parquet_checkpoint = self.engine.read_partition( - fs=self.fs, - pieces=[(checkpoint_path, None, None)], - columns=PYARROW_CHECKPOINT_SCHEMA, - index=None, - ) - for i, row in parquet_checkpoint.iterrows(): - if row["metaData"] is not None: - # get latest schema/columns , since delta Lake supports Schema Evolution. - self.schema = schema_from_string(row["metaData"]["schemaString"]) - elif row["add"] is not None: - self.pq_files.add(f"{self.path}/{row['add']['path']}") - - elif self.engine.__name__ == "FastParquetEngine": - parquet_checkpoint = self.engine.read_partition( - fs=self.fs, - pieces=[(checkpoint_path, None, None)], - columns=FASTPARQUET_CHECKPOINT_SCHEMA, - index=None, - ) - for i, row in parquet_checkpoint.iterrows(): - if row["metaData.schemaString"]: - self.schema = schema_from_string(row["metaData.schemaString"]) - elif row["add.path"]: - self.pq_files.add(f"{self.path}/{row['add.path']}") - - def get_pq_files_from_delta_json_logs(self): - """ - start from checkpoint id, collect logs from every json file until the - given version - example: - checkpoint 10, version 16 - 1. read the logs from 10th checkpoint parquet ( using above func) - 2. read logs from json files until version 16 - log Collection: - for reading the particular version of delta table, We are concerned - about `add` and `remove` Operation (transaction) only.(which involves - adding and removing respective parquet file transaction) - """ - log_files = self.fs.glob( - f"{self.delta_log_path}/{self.checkpoint // 10:019}*.json" + self.dt = DeltaTable(table_uri=self.path, version=self.version) + self.fs, self.fs_token, _ = get_fs_token_paths( + path, storage_options=storage_options ) - if len(log_files) == 0: - raise RuntimeError( - f"No Json files found at _delta_log_path:- {self.delta_log_path}" - ) - log_files = sorted(log_files) - log_versions = [ - int(re.findall(r"(\d{20})", log_file_name)[0]) - for log_file_name in log_files - ] - if (self.version is not None) and (self.version not in log_versions): - raise ValueError( - f"Cannot time travel Delta table to version {self.version}, Available versions for given " - f"checkpoint {self.checkpoint} are {log_versions}" - ) - for log_file_name, log_version in zip(log_files, log_versions): - log = self.fs.cat(log_file_name).decode().split("\n") - for line in log: - if line: # for last empty line - meta_data = json.loads(line) - if "add" in meta_data.keys(): - file = f"{self.path}/{meta_data['add']['path']}" - self.pq_files.add(file) - - elif "remove" in meta_data.keys(): - remove_file = f"{self.path}/{meta_data['remove']['path']}" - if remove_file in self.pq_files: - self.pq_files.remove(remove_file) - elif "metaData" in meta_data.keys(): - schema_string = meta_data["metaData"]["schemaString"] - self.schema = schema_from_string(schema_string) - - if self.version == int(log_version): - break + self.schema = self.dt.pyarrow_schema() def read_delta_dataset(self, f, **kwargs): - """ - Function to read single parquet file using pyarrow dataset (to support - schema evolution) - """ schema = kwargs.pop("schema", None) or self.schema filter = kwargs.pop("filter", None) if filter: @@ -220,31 +60,72 @@ def _make_meta_from_schema(self): meta[field.name] = field.type.to_pandas_dtype() return meta + def _history_helper(self, log_file_name): + log = self.fs.cat(log_file_name).decode().split("\n") + for line in log: + if line: + meta_data = json.loads(line) + if "commitInfo" in meta_data: + return meta_data["commitInfo"] + + def history(self, limit, **kwargs): + delta_log_path = str(path).rstrip("/") + "/_delta_log" + log_files = self.fs.glob(f"{delta_log_path}/*.json") + if len(log_files) == 0: + raise RuntimeError(f"No History (logs) found at:- {delta_log_path}/") + log_files = sorted(log_files, reverse=True) + last_n_files = log_files[:limit] + parts = [ + delayed( + self._history_helper, + name="read-delta-history" + tokenize(self.fs_token, f, **kwargs), + )(f, **kwargs) + for f in list(last_n_files) + ] + return parts + + def vaccum(self): + """ + Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. + + :param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise. + :param dry_run: when activated, list only the files, delete otherwise + :return: the list of files no longer referenced by the Delta Table and are older than the retention threshold. + """ + pass + # tombstones = self.dt.vacuum() + + def get_pq_files(self): + """ + get the list of parquet files after loading the + current datetime version + """ + __doc__ == self.dt.load_with_datetime.__doc__ + + if self.datetime is not None: + self.dt.load_with_datetime(self.datetime) + return self.dt.file_uris() + def read_delta_table(self, **kwargs): """ Reads the list of parquet files in parallel """ - if len(self.pq_files) == 0: + pq_files = self.get_pq_files() + if len(pq_files) == 0: raise RuntimeError("No Parquet files are available") parts = [ delayed( self.read_delta_dataset, name="read-delta-table-" + tokenize(self.fs_token, f, **kwargs), )(f, **kwargs) - for f in list(self.pq_files) + for f in list(pq_files) ] meta = self._make_meta_from_schema() return from_delayed(parts, meta=meta) def read_delta_table( - path, - version=None, - columns=None, - engine="auto", - checkpoint=None, - storage_options=None, - **kwargs, + path, version=None, columns=None, storage_options=None, **kwargs, ): """ Read a Delta Table into a Dask DataFrame @@ -259,28 +140,17 @@ def read_delta_table( version: int, default 0 DeltaTable Version, used for Time Travelling across the different versions of the parquet datasets + datetime: str, default None + Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument. + The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. + + Examples: + `2018-01-26T18:30:09Z` + `2018-12-19T16:39:57-08:00` + `2018-01-26T18:30:09.453+00:00` + #(copied from delta-rs docs) columns: None or list(str) Columns to load. If None, loads all. - engine : str, default 'auto' - Parquet reader library to use. Options include: 'auto', 'fastparquet', - 'pyarrow', 'pyarrow-dataset', and 'pyarrow-legacy'. Defaults to 'auto', - which selects the FastParquetEngine if fastparquet is installed (and - ArrowDatasetEngine otherwise). If 'pyarrow' or 'pyarrow-dataset' is - specified, the ArrowDatasetEngine (which leverages the pyarrow.dataset - API) will be used. If 'pyarrow-legacy' is specified, ArrowLegacyEngine - will be used (which leverages the pyarrow.parquet.ParquetDataset API). - NOTE: The 'pyarrow-legacy' option (ArrowLegacyEngine) is deprecated - for pyarrow>=5. - checkpoint: int, default None - DeltaTable protocol aggregates the json delta logs and creates parquet - checkpoint for every 10 versions.This will save some IO. - for example: - if you want to read 106th version of dataset, No need to read all - the all the 106 json files (which isIO heavy), instead read 100th - parquet checkpoint which contains all the logs of 100 versions in single - parquet file and now read the extra 6 json files to match the given - 106th version. - (so IO reduced from 106 to 7) storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. kwargs: dict,optional @@ -291,7 +161,7 @@ def read_delta_table( schema : pyarrow.Schema Used to maintain schema evolution in deltatable. delta protocol stores the schema string in the json log files which is converted - into pyarrow.Schema and used for schema evolution (refer delta/utils.py). + into pyarrow.Schema and used for schema evolution i.e Based on particular version, some columns can be shown or not shown. filter: Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None @@ -310,16 +180,7 @@ def read_delta_table( >>> df = dd.read_delta_table('s3://bucket/my-delta-table') # doctest: +SKIP """ - engine = get_engine(engine) - dt = DeltaTable( - path=path, - version=version, - checkpoint=checkpoint, - columns=columns, - engine=engine, - storage_options=storage_options, + dtw = DeltaTableWrapper( + path=path, version=version, columns=columns, storage_options=storage_options, ) - # last_checkpoint_version = dt.get_checkpoint_id() - dt.get_pq_files_from_checkpoint_parquet() - dt.get_pq_files_from_delta_json_logs() - return dt.read_delta_table(columns=columns, **kwargs) + return dtw.read_delta_table(columns=columns, **kwargs) diff --git a/dask_deltatable/utils.py b/dask_deltatable/utils.py deleted file mode 100644 index c8f263e..0000000 --- a/dask_deltatable/utils.py +++ /dev/null @@ -1,135 +0,0 @@ -import json -from typing import Union - -import pyarrow as pa - -PYARROW_CHECKPOINT_SCHEMA = [ - "txn", - "add", - "remove", - "metaData", - "protocol", - "commitInfo", -] -FASTPARQUET_CHECKPOINT_SCHEMA = [ - "txn.appId", - "txn.version", - "txn.lastUpdated", - "add.path", - "add.partitionValues", - "add.size", - "add.modificationTime", - "add.dataChange", - "add.stats", - "add.tags", - "remove.path", - "remove.deletionTimestamp", - "remove.dataChange", - "metaData.id", - "metaData.name", - "metaData.description", - "metaData.format.provider", - "metaData.format.options", - "metaData.schemaString", - "metaData.partitionColumns", - "metaData.configuration", - "metaData.createdTime", - "protocol.minReaderVersion", - "protocol.minWriterVersion", - "commitInfo.version", - "commitInfo.timestamp", - "commitInfo.userId", - "commitInfo.userName", - "commitInfo.operation", - "commitInfo.operationParameters", - "commitInfo.job.jobId", - "commitInfo.job.jobName", - "commitInfo.job.runId", - "commitInfo.job.jobOwnerId", - "commitInfo.job.triggerType", - "commitInfo.notebook.notebookId", - "commitInfo.clusterId", - "commitInfo.readVersion", - "commitInfo.isolationLevel", - "commitInfo.isBlindAppend", - "commitInfo.operationMetrics", - "commitInfo.userMetadata", -] - - -def schema_from_string(schema_string: str): - - fields = [] - schema = json.loads(schema_string) - for field in schema["fields"]: - name = field["name"] - type = field["type"] - nullable = field["nullable"] - metadata = field["metadata"] - pa_type = map_type(type) - - fields.append(pa.field(name, pa_type, nullable=nullable, metadata=metadata)) - return pa.schema(fields) - - -def map_type(input_type: Union[dict, str]): - - simple_type_mapping = { - "byte": pa.int8(), - "short": pa.int16(), - "integer": pa.int32(), - "long": pa.int64(), - "float": pa.float32(), - "double": pa.float64(), - "string": pa.string(), - "boolean": pa.bool_(), - "binary": pa.binary(), - "date": pa.date32(), - "timestamp": pa.timestamp("ns"), - } - - # If type is string, it should be a "simple" datatype - if isinstance(input_type, str): - - # map simple data types, that can be directly converted - if input_type in simple_type_mapping: - pa_type = simple_type_mapping[input_type] - else: - raise TypeError( - f"Got type unsupported {input_type} when trying to parse schema" - ) - - # nested field needs special handling - else: - if input_type["type"] == "array": - # map list type to pyarrow types - element_type = map_type(input_type["elementType"]) - # pass a field as the type to the list with a name of "element". - # This is just to comply with the way pyarrow creates lists when infering schemas - pa_field = pa.field("element", element_type) - pa_type = pa.list_(pa_field) - - elif input_type["type"] == "map": - key_type = map_type(input_type["keyType"]) - item_type = map_type(input_type["valueType"]) - pa_type = pa.map_(key_type, item_type) - - elif input_type["type"] == "struct": - fields = [] - for field in input_type["fields"]: - name = field["name"] - input_type = field["type"] - nullable = field["nullable"] - metadata = field["metadata"] - field_type = map_type(input_type) - - fields.append( - pa.field(name, field_type, nullable=nullable, metadata=metadata) - ) - pa_type = pa.struct(fields) - - else: - raise TypeError( - f"Got type unsupported {input_type} when trying to parse schema" - ) - return pa_type diff --git a/requirements.txt b/requirements.txt index 5091375..b095396 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ dask[dataframe,distributed] pyarrow +deltalake diff --git a/tests/test_core.py b/tests/test_core.py index 37d9d13..0f46bd1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -13,7 +13,7 @@ def simple_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/simple.zip") deltaf.extractall(output_dir) - return output_dir + "/test1/" + return str(output_dir) + "/test1/" @pytest.fixture() @@ -21,7 +21,7 @@ def partition_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/partition.zip") deltaf.extractall(output_dir) - return output_dir + "/test2/" + return str(output_dir) + "/test2/" @pytest.fixture() @@ -29,7 +29,7 @@ def empty_table1(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/empty1.zip") deltaf.extractall(output_dir) - return output_dir + "/empty/" + return str(output_dir) + "/empty/" @pytest.fixture() @@ -37,7 +37,7 @@ def empty_table2(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/empty2.zip") deltaf.extractall(output_dir) - return output_dir + "/empty2/" + return str(output_dir) + "/empty2/" @pytest.fixture() @@ -45,7 +45,7 @@ def checkpoint_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/checkpoint.zip") deltaf.extractall(output_dir) - return output_dir + "/checkpoint/" + return str(output_dir) + "/checkpoint/" def test_read_delta(simple_table): @@ -56,6 +56,7 @@ def test_read_delta(simple_table): def test_read_delta_with_different_versions(simple_table): + print(simple_table) df = ddt.read_delta_table(simple_table, version=0) assert df.compute().shape == (100, 3) @@ -117,7 +118,7 @@ def test_checkpoint(checkpoint_table): df = ddt.read_delta_table(checkpoint_table, checkpoint=20, version=22) assert df.compute().shape[0] == 115 - with pytest.raises(ValueError): + with pytest.raises(Exception): # Parquet file with the given checkpoint 30 does not exists: # File {checkpoint_path} not found" _ = ddt.read_delta_table(checkpoint_table, checkpoint=30, version=33) @@ -126,5 +127,5 @@ def test_checkpoint(checkpoint_table): def test_out_of_version_error(simple_table): # Cannot time travel Delta table to version 4 , Available versions for given # checkpoint 0 are [0,1] - with pytest.raises(ValueError): + with pytest.raises(Exception): _ = ddt.read_delta_table(simple_table, version=4) From 2c5c7dd3af34089586e6637a0bb13cdfc442100a Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 01:17:57 +0530 Subject: [PATCH 2/8] added following functionalites from delta-rs Vacuum read history load with datetime --- README.md | 31 +++++++- dask_deltatable/__init__.py | 2 +- dask_deltatable/core.py | 140 +++++++++++++++++++++++++++++------- tests/data/simple2.zip | Bin 0 -> 36773 bytes tests/data/vacuum.zip | Bin 0 -> 14210 bytes tests/test_core.py | 93 ++++++++++++++++++++++++ 6 files changed, 239 insertions(+), 27 deletions(-) create mode 100644 tests/data/simple2.zip create mode 100644 tests/data/vacuum.zip diff --git a/README.md b/README.md index 651f976..47f9fc5 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ To Try out the package: pip install dask-deltatable ``` -Features: +### Features: 1. Reads the parquet files based on delta logs parallely using dask engine 2. Supports all three filesystem like s3, azurefs, gcsfs 3. Supports some delta features like @@ -17,3 +17,32 @@ Features: - parquet filters - row filter - partition filter +4. Query Delta commit info - History +5. vacuum the old/ unused parquet files +6. load different versions of data using datetime. + +### Usage: + +``` +import dask_deltatable as ddt + +# read delta table +ddt.read_delta_table("delta_path") + +# read delta table for specific version +ddt.read_delta_table("delta_path",version=3) + +# read delta table for specific datetime +ddt.read_delta_table("delta_path",datetime="2018-12-19T16:39:57-08:00") + + +# read delta complete history +ddt.read_delta_history("delta_path") + +# read delta history upto given limit +ddt.read_delta_history("delta_path",limit=5) + +# read delta history to delete the files +ddt.vacuum("delta_path",dry_run=False) + +``` diff --git a/dask_deltatable/__init__.py b/dask_deltatable/__init__.py index 6f2e377..0d3f019 100644 --- a/dask_deltatable/__init__.py +++ b/dask_deltatable/__init__.py @@ -1 +1 @@ -from .core import read_delta_table +from .core import read_delta_history, read_delta_table, vacuum diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 05a7bd8..2bcefc3 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -1,14 +1,11 @@ import json import os -import re -from sys import path +from urllib.parse import urlparse +import dask import pyarrow.parquet as pq from dask.base import tokenize -from dask.config import collect from dask.dataframe.io import from_delayed -from dask.dataframe.io.parquet.core import get_engine -from dask.dataframe.utils import check_matching_columns from dask.delayed import delayed from deltalake import DeltaTable from fsspec.core import get_fs_token_paths @@ -68,13 +65,16 @@ def _history_helper(self, log_file_name): if "commitInfo" in meta_data: return meta_data["commitInfo"] - def history(self, limit, **kwargs): - delta_log_path = str(path).rstrip("/") + "/_delta_log" + def history(self, limit=None, **kwargs): + delta_log_path = str(self.path).rstrip("/") + "/_delta_log" log_files = self.fs.glob(f"{delta_log_path}/*.json") - if len(log_files) == 0: + if len(log_files) == 0: # pragma no cover raise RuntimeError(f"No History (logs) found at:- {delta_log_path}/") log_files = sorted(log_files, reverse=True) - last_n_files = log_files[:limit] + if limit is None: + last_n_files = log_files + else: + last_n_files = log_files[:limit] parts = [ delayed( self._history_helper, @@ -82,18 +82,49 @@ def history(self, limit, **kwargs): )(f, **kwargs) for f in list(last_n_files) ] - return parts + return dask.compute(parts)[0] + + def _vacuum_helper(self, filename_to_delete): + full_path = urlparse(self.path) + if full_path.scheme: # pragma no cover + # for different storage backend, delta-rs vacuum gives path to the file + # it will not provide bucket name and scheme s3 or gcfs etc. so adding + # manually + filename_to_delete = ( + f"{full_path.scheme}://{full_path.netloc}/{filename_to_delete}" + ) + self.fs.rm_file(filename_to_delete) - def vaccum(self): + def vacuum(self, retention_hours=168, dry_run=True): """ - Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. + Run the Vacuum command on the Delta Table: list and delete files no + longer referenced by the Delta table and are older than the + retention threshold. - :param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise. - :param dry_run: when activated, list only the files, delete otherwise - :return: the list of files no longer referenced by the Delta Table and are older than the retention threshold. + retention_hours: the retention threshold in hours, if none then + the value from `configuration.deletedFileRetentionDuration` is used + or default of 1 week otherwise. + dry_run: when activated, list only the files, delete otherwise + + Returns + ------- + the list of files no longer referenced by the Delta Table and are + older than the retention threshold. """ - pass - # tombstones = self.dt.vacuum() + + tombstones = self.dt.vacuum(retention_hours=retention_hours) + if dry_run: + return tombstones + else: + parts = [ + delayed( + self._vacuum_helper, + name="delta-vacuum" + + tokenize(self.fs_token, f, retention_hours, dry_run), + )(f) + for f in tombstones + ] + dask.compute(parts)[0] def get_pq_files(self): """ @@ -125,7 +156,7 @@ def read_delta_table(self, **kwargs): def read_delta_table( - path, version=None, columns=None, storage_options=None, **kwargs, + path, version=None, columns=None, storage_options=None, datetime=None, **kwargs, ): """ Read a Delta Table into a Dask DataFrame @@ -137,12 +168,14 @@ def read_delta_table( ---------- path: str path of Delta table directory - version: int, default 0 + version: int, default None DeltaTable Version, used for Time Travelling across the different versions of the parquet datasets datetime: str, default None - Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument. - The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. + Time travel Delta table to the latest version that's created at or + before provided `datetime_string` argument. + The `datetime_string` argument should be an RFC 3339 and ISO 8601 date + and time string. Examples: `2018-01-26T18:30:09Z` @@ -160,9 +193,10 @@ def read_delta_table( schema : pyarrow.Schema Used to maintain schema evolution in deltatable. - delta protocol stores the schema string in the json log files which is converted - into pyarrow.Schema and used for schema evolution - i.e Based on particular version, some columns can be shown or not shown. + delta protocol stores the schema string in the json log files which is + converted into pyarrow.Schema and used for schema evolution + i.e Based on particular version, some columns can be + shown or not shown. filter: Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. @@ -181,6 +215,62 @@ def read_delta_table( """ dtw = DeltaTableWrapper( - path=path, version=version, columns=columns, storage_options=storage_options, + path=path, + version=version, + columns=columns, + storage_options=storage_options, + datetime=datetime, ) return dtw.read_delta_table(columns=columns, **kwargs) + + +def read_delta_history(path, limit=None, storage_options=None): + """ + Run the history command on the DeltaTable. + The operations are returned in reverse chronological order. + + Parallely reads delta log json files using dask delayed and gathers the + list of commit_info (history) + + Parameters + ---------- + path: str + path of Delta table directory + limit: int, default None + the commit info limit to return, defaults to return all history + + Returns + ------- + list of the commit infos registered in the transaction log + """ + + dtw = DeltaTableWrapper( + path=path, version=None, columns=None, storage_options=storage_options + ) + return dtw.history(limit=limit) + + +def vacuum(path, retention_hours=168, dry_run=True, storage_options=None): + """ + Run the Vacuum command on the Delta Table: list and delete + files no longer referenced by the Delta table and are + older than the retention threshold. + + retention_hours: int, default 168 + the retention threshold in hours, if none then the value + from `configuration.deletedFileRetentionDuration` is used + or default of 1 week otherwise. + dry_run: bool, default True + when activated, list only the files, delete otherwise + + Returns + ------- + None or List of tombstones + i.e the list of files no longer referenced by the Delta Table + and are older than the retention threshold. + """ + + dtw = DeltaTableWrapper( + path=path, version=None, columns=None, storage_options=storage_options + ) + return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run) diff --git a/tests/data/simple2.zip b/tests/data/simple2.zip new file mode 100644 index 0000000000000000000000000000000000000000..3f6aa355cd9bb7dc63b2b0cc0a4725b9652cb042 GIT binary patch literal 36773 zcmeHw2{_bi`@fRXVv9m5TN0Vfz6eQ{qzy@gnl(yNAw-)jEtZN@DitMJLn$P(mC`~= zvXqh#5~76PGm4pU%*>ZL|M$B7@0)XR=$z*Jac|H4-1q0cXRR^+geiRVkH*x+W&_{+ z`69$On{T)Mb|(kwCTAkafvRr4ZZaSL{hubpZ1CygHkEJOgwAn%d=n(VUs%iXGClee zl%|0{rGJ_af0;#2#9huXIQT%o$QU@5iX#wVa1sp@;!L0x19oxr^buqTu zP1e(oFK?B|z?(hH2{FMYNjJPW67bX6aI$MsJCP9aDm&|!AAt&oOSPz8n#1p8NWC?E z7c9rpQ$)ToKuiuB@2F>vfS{l6nugcQaJu&@8O7d4-v55*DzDD8Fb&o9L zN-Sp;erTUpVlNR;sOo=c!K+i7rIkzFX3f(udGC*$__oSY5cV!IYnVm-)Yq1QU{N?2 ziU_BoaY!7DLWKjpAxKyl9zmtSFi0edOhdzIa3p3Vy+O68z=~x!R*OoQQOLiDi$x&_ za2TFQ2G)$l!Du7|K!ychh$spQhNqAb2qX@RrO`%22GybzY)xl9ygXsH%**E*uj6x% z;o8u8nf}Yuqf*Y?-nf4y8TMY}bx1*LwH5co*|6)b~CL)AT-fp2DJvPMz9t^%OeLPtqu=bipiPb5Q}4 zIKv#x)dg+%<<3_gjH6DwoslMbF2~So9IM_IphWL-iv$QP8b$(EgT^DsFf@?{^hSmw zVI(Sz2qOSn!eGfr3Jymc$)cd@?MigW#9a%<6;8yYK6PpOTDwY9NK9Sf=q8%q3y0-)AjJn&qee6;k7>|cz>QAe)qe|&m)`aF7?Su)P9`4 zOgmZFe{*Ba?AOMY_q1QPOI|QOB-U`AD7j?{;S93K1FJYkBiSUZWkHUxtF~HwY2zD< zet7h>i4${e1X9U-EtleE>Ml~kA8VD@yN4W~ccU!edJ*nKlY#1+9bY+3)UH%}iRQ!T@aFIK$1306t8%w> z-)2*0Y3X)ZkYCKBH>&vfw~zDIH_Qw8@Lqp<1Nx}%vKe|yC7Mj&L8o?V3Qxaq-APB< zySW8+nD*6V6RX;0mL7h^Rc&w(2xvezL?RqUBO}4lLPL{*+7K8R0)+wpBB2O)8hXrX zb2vUXZ{rGnLdXHK_bjXWbQia_`4@ftCB=-woM)E0HU%9y-YYZr*EX+GA$40%zd~F@ zW?1W9%04g2>3ULao`;rp_!nDec<-yVAlQAK7ce^=FVC7A|o2lw`@63>FBYg;9Sy4=s%M{d>LTlr1*Q%~&5 zaoQnglQ%qwu?skHWZd}e74?OaemG>j)oKpjyEkd^vxPe%V>K2|RhJ;j>Fx+lJ!^9D ze1YnPYdba5mn&Q_cwrZHXn{XTOL*o^jH*#uR zq9iXF0aS0}c=@v@nk!H|ScsxBHLQ-el9F43)8srTlcQMb~*+^7s7wtu?pn16m6$Tk-G~ z&>HISb)3P0@>IIk7X8s$i(C86SDsIS*1`t0w!flH_mf}z%5g14D=p*X77iV|Z93XD z&0Wv-eynzJm2|J$&WMph12HmX!_^dYU}4xmj0_rxk;*y9)z{k4^cX2do#xoZmzZ%N z=;uUMt>Knc1afmWINjOCc5*=ZscjcFSU)^<_W1D#-TE13auSDrWernZE zvI2R8Z0W2`jn5yg*???G+h9ps?Ch8Q-n!>)Ne>NQjr;9vDs*S-8*sJ@L(cY$7PZU} z+gLJZ(Al(xoGo@hZ6Rg453l}qHVfcvw^*Fbtv&0dXAw|a{h->sD!=M}$^gz*9R{2& zo8fGJlYTg-RB3NNu{R@)?riZx&XyQ4;B0xS7ov7*W)3);)Aa#ooAPNVGO%#K*%thE zwrPKyjgiI+73H{a&6aq{NuXv+{Dm_TSS{*&p+QFt$i^pf%)j98>mIf)sC|vEoqa67p*Sn_2OUo^zQ0rJ^s+-kalZvj$*qTeHb{?*c4PS7?Dj>lm zeMJgIt6cm<R`g` zWt~Xv{I*U#!iAnN?>YJ>VU{UB-tjrlz-uUB-nMyDA9KPSnQS%r?&;qNbAg~}ICH|> z1KUyk`rI>m!kqjkVXhb1$doWIWKEcNWVGB|)>2&EqL!Fx!AQJv2NEx}^$ltPAKKjM ziB|!6An`IvI)2JtQ;42;>CqD}wZk+2k$B|>D(iDS3lInx8cD;W@H7H&Vfs&SA`J#| zEes4vBa>-F1POse(MGZ-XlDV9^UZna1aI&KCwT6MaQX?Zh0MsBI$7bkn|6*|dLAvo z>o`5}y59dg@d}FLS2*(%r`&0}KJi)SFa7$HoAordl;@w-6~1;n`C{>Mzd(nVR~N5H z^1dMQ_(!V8_p&8VRJ;%7I9ZHqmO8L_L99=(@#Lqqdj(ArkMrH!>Y{k#+p3T0Cstrf zyvo0R@{0Kp@_JR-Q<;`_bg4m^=g&LueV**GI6FR59Ywu)!0g2=MREQK6{*ieM7}l9 zGQ27jx?FecxoA;sW#O`ZrtN|UI)`nS*rOsf2}jH|j6S%zozT+=xbl9jfQe|nhs2W) zl-WVvddjM^*A{Hf*gN&IwC77ny0bCjz|DN~SgvsZfdPdMG8T(Q(ZD^2fJMR3cpQkh zSOf}2!w@M5Q1KvP@gsU0)UzNu0Z(Ct29AND52@j`m`xO_gEMiH!;USC!qW(q9@faR z9d$-3ho6s+D8|Q!;P8qdnhpOzEZVwzha)E=EDke1rlc?G)75wM^7<)rR%t7s6lYm? zOXc~Tb&Mm8rt#g~TCF4t=Umfko`?@tX_pGb$eviDceaWH^ zo@tAAX#{0$bCtf++5TaB|Eoo=y&kVO-S7UEPyE&QtU>be+t@nIxcd*hgBE7QQulOq zHn%*#uW_N^vO2ERv9;iBT7=fSg5I{YwmVri)tehmT~Pn{^>GMUro!-QMdiHH&tq=5 zt44O4?moNp>@3H7S&PdXItnotdpq}8e--ko4D1x&x353U`?_~mIL@_OK>hyt*8XD! zX2y$xt{5b&mY;k1BtqEgN+K~yLq^t6JYuEUa`)502A8fArd!l6_tTs8BIDgEa`;bc z^of(=G7bi{o{El_PuTmT+a=0At6}=B(bEJ2PU^}<7mH1Ly*xQ0L@gz-91|AwBwXah zhSt_)JGV(ZNZMfkbf%~jB~c@@O83OfJBW6_+x9lpwB*d>w>>vkh^D8%?LE@vJ3HZY z@6Favst@n)tdU=};j4Yr%5&#l6$WfO>3_=Y#!dNM=dG_N=s#<3h??E8$*BF5k+^Bi z?C&Y2pC*)4ggWJz9LW_}UxO?Z3ib$k89rS=7j5(G{3_G4M;aYq z@0jljcmdZGexUS1Fl*A(S>it*Mk^N|IvH|mp@~ZPv^?kLt(m7g@uaw$5;t!Pn1&Q7 zgzl9*m%eqsd3yIWG4qc;`6@y#?@LdePTrdNbN#GC*X?wX6YbBRJ%6j=o&_fDpk`;% z^h;ZFo`_jVd~@Cvso|+0E8e$N@6pe-8J??6gZA0d)W2GO^zth{GY4_&m5QG~>0RSm zp^b9IPH}{=DE_%K145Kn#H2nAN%6M~n`b{YLsruz1{Nq0<2Tt)K?qTHOKf@CcyFyO zRTESi?K@!W52KuLB=5LsWe29mg$UnzN1Yn=#m=tN>balC&b8~#OA&)Jq-L0k$bVFI z85dfgZbJHv+S?gwu)!pYpmScWTrSFZJc_YIBAN${K??n57~+Wt2Oo>mwJ?L zn0CWb`+-q^f7I7`gmmjG^4BgOkjU7v$n`i)<=LzEX60C&b7vbVg~_WOu#2yWORoJ9 zog=a`?74)8%0@Y#=R_U2ZM|-a?)pp*&11_~H3$V<{-{ZejR@*cXN?tQu@rxe@qB#W z1V$Yz;M`-yDXkrq@M*qXebb5yvrj9%Nsl)|Q5KfSN$--jJCZK=)q-mCqQj~GUaI$P zzwvXXNa$<2x#^oeGd4ARUyv2_gC-bRK36|bt5~|n$icYker#`Fzj$MVJ#RU-@a)(-kcw zG*azmxF_8*zZCc4`4xG%Ug*Ni5Gl1>A4KPwFtx0o#vk9}!;UPGU68n1@@sz8k(e!A zE4B6&sR-{CUfiO$@2s8j?Zt=`fv`QK6mfae`Yr2a37g;b#f^KB=(j{XeP?-Mk<(`l zhYWq2itM7={FJaNkLJ>bOg{_qp3jn(3ZsRdISKCE-;Ib+R7}@9^Qfjx^O-?RXxX;# z=RIW$Kh;}ANFS}^kE3q%y?sc|i_d)Gcn95a6z7c#%`hcd-tybzLJN<$ukjpX}Wy{)I0AC)NB(k?kq2TGym7Y)Ctj3m)+7|Q1SeL!kh7` zeQ(lLHyRbqT)uzJji}zjs2QKHEH(9iE^(E=E{Z?(OZ zaS>QxiGNows}pH*T6M>&`0DX7lJ1-($D8ZLwAF#5*!?Aqdd@1T#fYJi8b3Q z;iCDiy%GkOlMY^xdz4Ipbp)qsNq9VzFZPO)Ryx!fQC^{&{AEw#`R~EHD^x;0ce|bI z3U@aYE67^&wZ6yp-}qMD$M4#cj<$OY4fawCQFR zNbh~BsooS~Pg^k#CD}?l_4$cM*Da4t0@mLQEbgqcNjF}my(CflmqeYh$@$wPTGU%) z3t{aJeXE4;_Lm;p*B;DURJ-qeX+zabsh+mQOQK0x>IcQ2N>ki1Xcu(Fim6!9Lq~4- zo{sV^*!yyYd-Oe@wQ-V<50rMene=JT#)lRAcV#YeZLe4wm6m@{dP(&MCyNK|p4~D5 z0_4a*6XSOY^6+)RRv$Jx*?dXGmBgg?+($3(n055U#y!wY2jwk+(x)=WWz;VqU}A4_S6o)-hF%a z?cX8txVdy!b#r|~Tu<(uV1;PSIBX1=XU&>eC&1n!+6<7?Rw2`mrMG`VjgurkGt*5=Y^*^?cds)7% zsnTfg5mB8syZ-8&?ar0fBDP<~mm^v~?bpd_-c#dKkoeNFF|P5Uh9o-Bd26ZRH~HFF z0lRk7W1FM{+AkiCKTHZuO}Kx)qGI-ma*cDjxM#$Lj_-=}GfK}Qt;&|IpFBy3>~gwA zt3Kq(qDPGtcIBlD;{Bxgg|PEgyiDU09z^*)sLqSCK?ZKKDAK=iC1GC80;9u6Wlm00 zo?Qo@GnJ%hR4%ycX{`3S*1L+6*5XveD|Oy{Obz$zwQ!Ny$xjS^j16vFQJr|xp5HE2 zWW{XS%-6K|%uvTEPXwf^-yzkup;ZM-4pj!;?a+AbrWW2XF)iVVnb_=!x$+H-G3jbi z)9`9GdW{BlraAFu&V+R84trc>;8BP3!v zl$+1r=>AiZht)fzITDi)d5buPP z%?8Lzrj1_>P|H@n@(VJ;Hmb;8mRUDt>r%hh5(nnz_D-)hS8M$0bn$f2`j$!I`dX?5 z&s-dQ`Hh?BK3e;H!)B!$*{6P;eiVGyeVd%O-{su4qfz{hPpIaoNj0C)IA* zM3kf+HaMtM@8v=B&MA0Tkn#Ofskye`y@sN2SnFw#A5NdEJdgF=?S5igzBA8iymdyN zS+o?QwAl691N+joJ`PuiuV1vexi}Rh?~67=l*czYT)I`cNBy1tiEgQx+2+AUb1w_K zUn@DYO^k}3#;yeO5%1 z+>0B}5z4YNf;9A%;g*67+O;E){eni&CZg z4wXcoxkwet(CU=j^Sbd#bdwMjPof4GbanOTXeqlKlr*|OrcO5~eo|0y0tSzuce+7c z6ipx_U_?9~58B-rEQW}NBj_DwZgsm6s2qdbiGOywEBl&vaj)BG=_NJSF?V**ZZ zo05@adN~(V60kHpfq;S&F#zib$T(fT$5^+M_iTT(xAmXgiRVn*iKmK92L!M6b?&@( zy6mypJKyppOz znD{QRXsgmkt3xT>b9GCpT7qeTBeS0*EQ%x`t zOB~$g+pEm3@wJwMCeFL{sRa-7_zohQ+m|ilnLCkFG`0KsuzL>FiaQc?wW$;o3Pwg@ zK*JP=#RD!5iVQ3bhDyJiVI&>WX6|tEG5gzObI?^QFWkxt84FAmE~CXe1Vng<;8nr-r71 zsx6)Z+Oik|frvx^&`8if837q5*BxWPWn)n-krCbI!n698%CBAo|2XD=Yp%0+tO3^? z1l-5;fV=K@z{NJV=X!f)HK!K}-^K0CNXh(W%cg5y&$erRJ0nHZD*F$!h*5Iq-&>i@ z)uP}a$db@lBpFR6!Qf;xuqXuhf=A;pFe(xO0xpt(!T?s}2zrBBayM!BHl|w?0~#b_ zr+4%TXc`WQp(0_Zfo=c^m^OimN5QBl1RRMV5&)Twr*6PMjeTq9gJkZ;4){C>9F0r@ z4JAPE1K9-}^aqI)I!%;{#ZnM7GLcIAe`D;ifSJjzzmzV-)L-g7G}K?Zn;rbSzx2A0 z-d|eN`O9*kzZA`(zf=dczjUu=g#J+R3&tab zyV}NXQ2)Ugb@{(OMlsaZuAg*xAs0Rt0ubj&CpkWZG z^y39ThGb?NI>K9&FVrA;*(g*`pzJLY9?s zMQ#4E-5&L4&w6b+n_sqlxhOHOee%Q?%Gll82b#Hd-JqF!_wz|eCyV_s8gvp|%5@GL za8`#}@L{5zP+Rbs3+dd);*k6)$yYE2^Ct=(w~vsW`?0;*vFpy*TktglH0ZSh+($$nGztbsk*Op+o(c%r;50!a2q+j4Tr*(c zUPz|U2q+v4P@+dT@}P!+MozqXcF~)a9l(g*fR_1(9WW5R{|ngxMm+(mxmy%Hvjlo0 zP^ci~CBtce9e||3NCZI2!clPu90`k~fuV?z^ajIi(NhQFs z7&s0^fMJQitk4Ad_y~F+DW;-OBoqz@i1#BQgJw~NfB7b(pf}>$51O^XMZz`D7K!eH}1`Bv+c#u?Ls93;YLV?;AFBkN`3?}v&CKH2+ zk*Yun1Ynp9hbMqs1qIR(D*a-BBZFHoo`{7JNn{!cjwVo0Aa5D-(egJCKvEkcSutD@ z+`}~xz=6Br;RF;O%xA&i1Tq}Z2jOHG9*Y1YTSP3HKt)jqSWw>{L2oR9fI}hsaFTvI z>n%9nT2GNHG%%C}1LYkGAjA@32n5}vXcQ`pKqOOOSR@J*-QgrEfq>)%+L~4VlUSia zOYwLJQc$HfnBp-MYW*@rf~(X(`D@HKbg0mhA9QYIjqH>BSFPceEAXOEXt{!cNG?x9 zLC#0OP>>(nJ7!-_x1k`yno(!TF0Zsr1C4~q|!e|U#1SqBx zKsStQ%;#WhZhqZTtkDZnUlj|=tvFD5p-}KJ8Ucd=J^`-Hcmx)Z=}Dj$OhnP}WW;}J zCjEc5ugWkeh)WJI;5JHxgM@|z!=mYGBY`O!@CR@h4n>6{(I97~E0SA&!@-~immDyl zb#_PxbGLOKEDEX{?rQFTexWRM!7F>oXa zPDUZnBaV?kh1Pj4;UFutJ+;NlxkBS*A{;8TyP~KCtk60-6ce~YgM%{7Sl3oSg;v>8 zR>B&*dL2_#xu#w`97YgRucd!SL=9eW9T8<*sv%AG^1w2JsJ6e8y z-4VmX3?5{)EnRLe=QrKSVePUe>oj5Y>bz3XHU))-lTS$%j=hKzH}~gy?~mV+{yzU$ z>(}m2r@Fp*1wBb<)jcGrcySHff6>je+aj*eyQGc%bS4Ml50j&K=e>}!rrlcmU(l&@ zPo@7*=i;`YN-itPxG<=57ri?FQ0E4i9KuCgCxl_QQKsZ!gq`roA%15b7g&6fS!C^_q$&3)ZYAg|05a z&$XR*Vs)e)e%(?Dy_<6d+VRqbALL#fnKCH^QQlImYd2B0a~P$z`0BhSu7?{B_fx2P zOa2$@3ZHFA50!r}8!7!Stt-6Ptz!rwT>=^97$?kG(j^Y1M=qKyPrw3o<8T{~sWV zj`n&+Z()v+NX=pc9Bz*Pyq-b2Xz)T}g$nJT7Y!(k6<*p2oXj)y#9zh=m!%OSGgcsP zAsK5>foe~KTSypQm}k9j4);jR1Hk|)wD+qcC$know&Xvew&kCWE^2X3^Cum)u*FTFiARu(ZQK@Sb8{awmC+NHb)6V7#V zJ&kzi#SW=$gc8}1+UU#h_5ZFe=eD|h-1xctj1*^RZyxwYh)?RzkD`3dOX=SZd)o$q z-*4M6>^&0J8neD6E1zNTCHeTG4jK-B$$#%H95_K^gcsJyYG!D)!=e2vZ{ffR6+;KC z?@7Rn>MYp6!J-3(z{b8S`FGOE2^vFStnW#{3=Iv|4Y5G`BQRdJ0h_ObhKhCLJ~OH> z6WE~QY5zVnRIFzgGpfjmY*6t)V1p_v)`XK8m6hPgsCe6+4oz09tN~_JqQEakvMb)U zgL6W~a75O1*UYF2rw&6k=!m?S`)C}IH3MaaMgq0Rkp;%f@@r@UV=cKbqk1ef4Ar2( z=)CT+rpcob80&6mW@t3wVbF#`{n-0>P`#0La)=q#Y_Pg*L^bj-g~=He!|qvE`!b^{ z1^aJVPz|b)_eH`?KtE(V&c zSlJ}Zs2X7-qvB=Qi?gg4#>G1Q$&4yOeHf}CyXPU%oezSe584hhQ^7F7dfwNyS%#3w*oEg;)O*R7KeUhA4=bWO&4jK=u*0}4EAvM<7ab~Gq zWe<(lNpfgXV}F$EXtR?V4~6*ANR4%NoLOpb*+Jul1IZm4!#UX(2V9p7liJ|%#KYV# zROe*hOH>05$`LD*a4@IF+oUu!Gh>~NX4YjPTWCB;jJqxw&dE9(%?ypC%T{WhKAA`25s=<vSMF|x(xUSEFohsknuoQhKh=HaFbbs8;nLq#Y=|;Dk|2oL}pZ? z#_UjyeTfcdRE+S&I(5g4s&M5nRE#^x*n4-QiL9*jcV=j$RcxX0I)ewz8`;;K(cKW;4>XuQ<;xtkdyhOsXyo*xcvNS9;Z ze+N~U?7RIYrfdX8@Ai+?#JCHLp-Wb#KC@qbVGE6iBH3u5v6q^k87HanG?B&00~lt; zI)lY5HHCG_~$cdL`DFwQrSugQI!`Ic~UnQ4$am&l3_55HsNZKnFL?)+^cH)2O!A z_HyBV6^R=Q-#WWv-ToojqWOiyPxp~c1CVN7gB0;?JoXaJqmGvfmhsneHT$r4 zK}=W8RG2;9yCJpu`?Xrg))x!SNToDMXTPO)By@}q^YMjyVVLNB4<0*o@C;d2#^lYX z85Ag>G+|)FlCzcPW8eFI@699vEjU#K|B@wpEA(L`Hq>oW_cI%p*!>ye)!rn3iG`_6 zXL2Dsq7ki#Z=mi+ZnjX7V1E@LGo`OsE^P7hf@I6u6-49$9hy4q|HnnS?JlX zmd{s5>80~aNAXt_yE9)_K4*C4c>DRg;-?J~+bps1d`4wc@!UyYebZ0zJ#%Bxqkzda zbP6vuz_wKNQ{#OieK)9ZirX3nzp83W-amAOMnO>^2_Omp2BP2q2ox#~Kw`w<082}- zI0l16A)pxX-|0#e?SkG(U$|f6mL?)7d!&WLM?2d*SFsB;5C8<1z@m{rC;$rt!2l2p z770LNAqapa8UjZlU|=j5hW&5QW@xbd_y*38rI``J8F%cC1`YzC!7wmV0)+v$=(zvT;t^v}1*1ZC4uw6vIx85+MBB%Rlx{ z_1lNO7?D$z4Y0Q`9W60qwQ;t*i)Ar4P_i~a&*r5L{Ip8{Ozj<2n>Lk{O4qtVKX-1W z`<^k0PnKJib>Q`;!&icI2MsI8mLS5d*2&nXe(4e7nh81KF9q8g6)T_5sX{Ni#4*nH z3J?!2T?pu4+jvj9Kb7s*nEt1-e_ds}bt5peI0lPCf&m}|0s??o;?5@&8VCbmAW$>{ zg7Ym9ki?#M+5g<6{`qhn?W3YB^NOL>K^FL#V-*|5P(?1XP@*@*&&aEN1 z!;I#3NLM#=v=tU@>*Q$duw!iC!|j1}akX}I;Fbi7aJyPxz;a81fPNJCD>Kj-l5fQ_ zTf<_9l|@DWj$-hCh>O~|Iy#7Ap%6F_hJ^wkU>F(zfuIopBnsysC4g9{1PFo=2Sc%< zZuU;Q^^TUuNAGzizMzh;zE_7&8>7a3@rRe(GcD*lEffMm!eCG=0E(5sAw$H$03-qh z#JMR;Gy*FDhl@jzd!f~9%8|oAedThR^eAx7@+)qDc91E)F#rDaopGn*H7!=gwgE^R zy0{UUTKX>L&Jk~0wJ^KfU^hW_(btH>jn&7WDcY z9_R*pb`S=aM?2c;lJ{T17@n zL~?rR;vQ-od0#1$1$Q>F(vmJXaG@SboPa=XZ$bx){@hW!iMqd>H4A6$Ig;LKQ#EI7^la({XL6I#l`10v zsZ?fbNW5uW@45WyAl0ae+5rCGmib8zB78;}SS-E{Z-Dh;##?s@=&8i82{-hdsG5u?r&opB3#_HSCVcP0! z4c4aUfYi`1hNSeTZCplx2;&-lrqFO=Be5%i{Dv4p0|VYOMG>8Xmm5@P?|sVPWN84t z%paMNk)7;uLL{?$)Jq()1ucYYE5o#M=&CXpCqPm>OCJ8 zaen=5;92F@i;rsMzdrcdsNJSg`FM00L8r#Sm+5(<^q$O_G;`6XZ7!|?!=!-#lg-7$ zQnH`wzcf9CBsJtEL|dQXPV&Is9&-}*uy&C<)oXz_3i8g%l#Woe0Stp;K$PU1O=!lz_CxYwf%NXba z(tyVW2gy@JU~Y=^Aw~ShZ_}$4=@`5#)CH)<8=q#0I9yZzHjUHZ*{L(~gqQV{V-Mv} z#ID*XC?*vfm+C3xQqcK(MAk@l&kX~>OyQeeP1HBV%x8%}Eo~hEpBFy7EECB{jP*Qj0zPQ)%1|5ZZ86LO_H zLGkMq{eB_62CLG$ui`!W`U)GcEh$r&tX76!9h_ewZFOsFKkoSf6)H9C*^YiI_ zO|jAQC-Zys06wvoy1%%KUOzbAS4Ypz&Eg|5Ytp>1uvn7bcdmN4B0V8dvhCW@N1Dh1 zOxL7|MWCS8!y6_A_I;A=V`&}@rsu{EpK0_I)-aytVv}i%!umuTs!T)5Z35wM>@rRs zf4)+}>h18A=0e^_SkPR~pmmkTgwnmvVHfBFj$1-rZ!DL~9-~!Dzv)f~k48iM{nA{X zR%kXtry&MPSBE)UOjOA60Cp2%Q5%MqL#w6=a79D&mc9m zY3Q8S!HC>L=?Ir?i-7xu{8~UgWinvVew^`2-VS1=c(JS2b13AFH51<^3khZ^8nvB z2^qsq>5T~I&Gm9-QPsA_C=T%?R*_9EO73~4Gg6SHp@g7!ZY#V8WiiV!XK9OD{`!)2 zXbqygOy~l7dWLv;7QVUa{P~t+p$W%@$&0 z3Sk)ydL9d3NaQ^wuzWg`QE5e5v4-*%zo@*@29)w*hk1-MNRh`pqavuW@xy4Y$?tM)fP7$8h6wAPuwd>zr2L6Usooqy=DskBLpvwb1g1bdbh zJJ?NYUQJ%2IW=q65KzqZaaF44G_}{4jTf0KQDz$!X>(s3gBAsriiKzk9SemyE-ckQ zUCtV&lJo80bX}%@wi!aQd?7M5E_~)}+v@NzQ)EQ!D_#c3OPpyG=4`R(bOg4K{J#q60 z<=i~iw3(GKM|r>us zN)SGt-y9l~z9YrnZ@)}CXI z5|uA2lT9Wtt}9-uza{iqfqg*MyZqS5#(7R3k>>==o+W=V2rBu77m7Ltc=@GW$s*(+ zwHoIPwVVVUlCcGveOIh|AmYGapyE5>+Yn?yGXM#`ev`#H` z%vT1-ItG1ThSJXe7ZIKsNm}j_ep^e(&9|xpX$A8ex^N>n&X}Z|`xvZE^W;>gr8aRWG_wlkb?3LZgeiW~oU8RlugzoAohi`Xf2j|> zAY0)RNbHB<%rIW2jpax=W#4jwi`dt#$3ksAa_umWslE7$d1Nnbd-PrUjK~DENURTA zRF%>v{&=?KzQ(x=7%w;4R7^7^MVNeyLw8oC$;D?KHcqL*K$0~DbvlP#q+}F;JICz zirKvo*KbiDE85yH+zh?UHx-vOW|JMkoTla30ZuPFlOAT}-aJ5SSu3yw}#RV(ev+afxio%cYD|0R(AhBb9ZRG=n&0H@!RL~nQ=C!ai)(yw;JGT1!pNdx2>xAx8ZOhHxi->i{_sy@vw76c1ALBN7BDxC^jxp)2%MPUn2gA{~K-89lM5e z$2_%7JWj7t96K5LS&>CtG9;(}!t|P!?_Dtkcrry7zRf}XkKgkoQu&MEt@e;Rs6QkpM&2m$g%R~=O1b16maSN+=D|a0b z>sN5$Do#;M>$$I&X{a|=ROk}e^&*D$G^5PA&1Z>mzW3AM<|b<*?Wr;)_7CJLOGoR< zRpQMgkG9i$*maekIEtA(`x<2Lq!Uo&LwC0I{NekLfg>)PgJ`=(o|RmyQRF56Q;b4X zMZ78zZ0^rS!Y1tFX&kP%1XsynDpiZyoHy!nDFf7EXmw>e4IPgjFLJL7CQ$=KbEOXs zoJ`Me@ZqY|_j{3=o=sOvG?>%jcKQfOWmVa=5Ajz$A3cVoP0Pk#IPar%vVm0vC`X=Y zGa=$feKX3GbMr%X0JHiz4lRoqudf^%3dS?pMp?uP4foHwX3@ekt!fgp$KJSyykgl* z)8~!d?qek)v8obYxsdU;fQ{ z{5>6YO*=YL-J%Uo(ZiV$hhi4$I2M;g)XwxkOHx`duZaYf7EaV=M&;2R8*K*ISH~-;Qyh0=%#Zs*B zLO)I|f!CO5T;K4O29H7l3JY`V{Br?*A9wTy(9&v88PBLZ|Uz@tev)WU4o1w8y9Wp z>KfHJtF2KeN!b-1#xgqP#M4Sb2boYgZ)m1nqHlDqFWye%Hgk^tD69~2Jvk}N^Lh{& zdT7)|Do59jCi_ErlEOM~wVr2ziCvmKpF<;jR)a4ls5ipV+;Q5W@o67pO5y0^uN4vB z&>vFfmoJZ+yNa5E1ya<{fp2Blepsb(9tcks8tjQ@)T}w|T{HNR%l`)dvFu*{et@sE zKb$PW`C3|mboYA$iWK(%a(j$6MUE@k_4_0oEC&wXd)Gmi)lF;j>6Lf~M8^D8e~V1g z=}e#gFWyaMO6nVCkFHHXCs}z4kOE`+8=wvr15$#C7h{qH{K4JKJ*Y57k8$ZX4ty_WVaiEP zi}PSxW6$Zq?>5l~Ag=C?KQy`ueWKiO?{e!u$M$uIt|I=`># zJW}gXnIJA!xr!S*bqwvd3uQ-@>FVMdDrlqK6J;kQ_(G(wK5RZHQAvHzed&EF+2a|5 z&>UOIE7o`Utd*NL?)88k3DCIsF8a0jd1B_5QY?bW^A39udiCDKI7QKN2MIrsO<61C z4ArG!^OA{5kgAL zYa>T>ddZsSBNogGp1S4VHJv2c>KlTQHW9KJTh|Qw%^Dl)k_kUqc`~_pu7VG2c$f7V zx1pilkx;-;bje|Nv9!B{A72_t7&_!0%(lKpS3^|6(E22N{RQ^f$XKN;*PO3&U8#Go zs(q0eJ-_s2Z#u2HYCUDaqei--ts<#!KNgB~xj}{BL$tu&9F7&f>1`vXpOnQ935Kwm zzx#Y-t%YXmK>xYPc3Yg^@AltWWB=>%`VT!`gha;<{B&`+;P0R3(BCEXt&g+>U*xx5 zf9dig`QGLCOJALi`qq?x(*IVK{`->?pn`jYH|4f2vi$}g2KqZu{rsZ%?QGkr=5Qwq z9@W3s+_pP3{4irXS`^VA(f-@!ww=&^>Jr~!ZbuWurGxOK_Kzosk+dl0fa9&XEY1!W;(sye|_zC>&5LAqRhS*y=e|O@*Pp)s* z!paULztO^vRQv9h#ZS*}UsjCe&jR?(vOkiwyIU4N@3?*0Hr72Y`y&gv)3QI0K>SSC zc6GmG|MRlHJvR20_u3^TUY2V+rSGwwD1SXn_n7nA9UOj^Ydah>$4+p6G?_i-ymm*4 zpLy6$S^q2L_84!!q38EJ#O~1k9OD^t{vyDiX#0<({vEXWozV81BiReupA{O*^+&1g zKc@P3(1f{nK-=%7_dlckbnSl*+M9nx+ZMF%SD!mO-JewWiJa}5)0_7ff&D`T1fci% opNn=&_2-~nnE%gI`_k literal 0 HcmV?d00001 diff --git a/tests/test_core.py b/tests/test_core.py index 0f46bd1..1a315c8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,7 +1,10 @@ +import glob +import os import zipfile from io import BytesIO import pytest +from dask.utils import homogeneous_deepmap import dask_deltatable as ddt @@ -16,6 +19,14 @@ def simple_table(tmpdir): return str(output_dir) + "/test1/" +@pytest.fixture() +def simple_table2(tmpdir): + output_dir = tmpdir + deltaf = zipfile.ZipFile("tests/data/simple2.zip") + deltaf.extractall(output_dir) + return str(output_dir) + "/simple_table/" + + @pytest.fixture() def partition_table(tmpdir): output_dir = tmpdir @@ -48,6 +59,14 @@ def checkpoint_table(tmpdir): return str(output_dir) + "/checkpoint/" +@pytest.fixture() +def vacuum_table(tmpdir): + output_dir = tmpdir + deltaf = zipfile.ZipFile("tests/data/vacuum.zip") + deltaf.extractall(output_dir) + return str(output_dir) + "/vaccum_table/" + + def test_read_delta(simple_table): df = ddt.read_delta_table(simple_table) @@ -129,3 +148,77 @@ def test_out_of_version_error(simple_table): # checkpoint 0 are [0,1] with pytest.raises(Exception): _ = ddt.read_delta_table(simple_table, version=4) + + +def test_load_with_datetime(simple_table2): + log_dir = f"{simple_table2}_delta_log" + log_mtime_pair = [ + ("00000000000000000000.json", 1588398451.0), + ("00000000000000000001.json", 1588484851.0), + ("00000000000000000002.json", 1588571251.0), + ("00000000000000000003.json", 1588657651.0), + ("00000000000000000004.json", 1588744051.0), + ] + for file_name, dt_epoch in log_mtime_pair: + file_path = os.path.join(log_dir, file_name) + os.utime(file_path, (dt_epoch, dt_epoch)) + + expected = ddt.read_delta_table(simple_table2, version=0).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-01T00:47:31-07:00" + ).compute() + assert expected.equals(result) + # assert_frame_equal(expected,result) + + expected = ddt.read_delta_table(simple_table2, version=1).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-02T22:47:31-07:00" + ).compute() + assert expected.equals(result) + + expected = ddt.read_delta_table(simple_table2, version=4).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-25T22:47:31-07:00" + ).compute() + assert expected.equals(result) + + +def test_read_history(checkpoint_table): + history = ddt.read_delta_history(checkpoint_table) + assert len(history) == 26 + + last_commit_info = history[0] + last_commit_info == { + "timestamp": 1630942389906, + "operation": "WRITE", + "operationParameters": {"mode": "Append", "partitionBy": "[]"}, + "readVersion": 24, + "isBlindAppend": True, + "operationMetrics": { + "numFiles": "6", + "numOutputBytes": "5147", + "numOutputRows": "5", + }, + } + + # check whether the logs are sorted + current_timestamp = history[0]["timestamp"] + for h in history[1:]: + assert current_timestamp > h["timestamp"], "History Not Sorted" + current_timestamp = h["timestamp"] + + history = ddt.read_delta_history(checkpoint_table, limit=5) + assert len(history) == 5 + + +def test_vacuum(vacuum_table): + tombstones = ddt.vacuum(vacuum_table, dry_run=True) + print(tombstones) + assert len(tombstones) == 4 + + before_pq_files_len = len(glob.glob(f"{vacuum_table}*.parquet")) + assert before_pq_files_len == 7 + tombstones = ddt.vacuum(vacuum_table, dry_run=False) + print(tombstones) + after_pq_files_len = len(glob.glob(f"{vacuum_table}*.parquet")) + assert after_pq_files_len == 3 From 66630ee7ed345370deda4f36f4e06ddcf0f140f9 Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 01:49:46 +0530 Subject: [PATCH 3/8] ZIP files debugging --- .github/workflows/tests.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6ed8061..21ea624 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -42,6 +42,11 @@ jobs: - name: Run tests run: python -m pytest --junitxml=junit/test-results.xml --cov-report=xml tests + - name: Setup tmate session + if: ${{ failure() }} + uses: mxschmitt/action-tmate@v3 + timeout-minutes: 15 + - name: Upload pytest test results uses: actions/upload-artifact@v1 with: From 46ec3b0f0875a5908dfff43a5e75e36786bcb207 Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 14:57:11 +0530 Subject: [PATCH 4/8] ZIP files debugging --- .github/workflows/tests.yaml | 1 - tests/test_core.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 21ea624..ca26413 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -45,7 +45,6 @@ jobs: - name: Setup tmate session if: ${{ failure() }} uses: mxschmitt/action-tmate@v3 - timeout-minutes: 15 - name: Upload pytest test results uses: actions/upload-artifact@v1 diff --git a/tests/test_core.py b/tests/test_core.py index 1a315c8..e661bcc 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -62,7 +62,7 @@ def checkpoint_table(tmpdir): @pytest.fixture() def vacuum_table(tmpdir): output_dir = tmpdir - deltaf = zipfile.ZipFile("tests/data/vacuum.zip") + deltaf = zipfile.ZipFile("vacuum.zip") deltaf.extractall(output_dir) return str(output_dir) + "/vaccum_table/" @@ -212,6 +212,8 @@ def test_read_history(checkpoint_table): def test_vacuum(vacuum_table): + print(vacuum_table) + print(os.listdir(vacuum_table)) tombstones = ddt.vacuum(vacuum_table, dry_run=True) print(tombstones) assert len(tombstones) == 4 From d017de38990943b0fb5854a422ae5b15b29699ad Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 15:09:36 +0530 Subject: [PATCH 5/8] ZIP files debugging --- tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index e661bcc..70ac728 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -62,7 +62,7 @@ def checkpoint_table(tmpdir): @pytest.fixture() def vacuum_table(tmpdir): output_dir = tmpdir - deltaf = zipfile.ZipFile("vacuum.zip") + deltaf = zipfile.ZipFile("tests/data/vacuum.zip") deltaf.extractall(output_dir) return str(output_dir) + "/vaccum_table/" From a9b9193855dc025f18925ba7bfc3c1725eb3877f Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 17:53:03 +0530 Subject: [PATCH 6/8] fixed windows zip error --- dask_deltatable/core.py | 2 +- tests/test_core.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 2bcefc3..6aa2f0b 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -86,7 +86,7 @@ def history(self, limit=None, **kwargs): def _vacuum_helper(self, filename_to_delete): full_path = urlparse(self.path) - if full_path.scheme: # pragma no cover + if full_path.scheme and full_path.netloc: # pragma no cover # for different storage backend, delta-rs vacuum gives path to the file # it will not provide bucket name and scheme s3 or gcfs etc. so adding # manually diff --git a/tests/test_core.py b/tests/test_core.py index 70ac728..42dc1b7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -64,7 +64,7 @@ def vacuum_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/vacuum.zip") deltaf.extractall(output_dir) - return str(output_dir) + "/vaccum_table/" + return str(output_dir) + "/vaccum_table" def test_read_delta(simple_table): @@ -218,9 +218,8 @@ def test_vacuum(vacuum_table): print(tombstones) assert len(tombstones) == 4 - before_pq_files_len = len(glob.glob(f"{vacuum_table}*.parquet")) + before_pq_files_len = len(glob.glob(f"{vacuum_table}/*.parquet")) assert before_pq_files_len == 7 tombstones = ddt.vacuum(vacuum_table, dry_run=False) - print(tombstones) - after_pq_files_len = len(glob.glob(f"{vacuum_table}*.parquet")) + after_pq_files_len = len(glob.glob(f"{vacuum_table}/*.parquet")) assert after_pq_files_len == 3 From 4212789a640dbf2c7099713939fff5b8bc4fa95e Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 21:15:55 +0530 Subject: [PATCH 7/8] remove debug from ci and updated readme --- .github/workflows/tests.yaml | 6 +++--- README.md | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ca26413..55fb567 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -42,9 +42,9 @@ jobs: - name: Run tests run: python -m pytest --junitxml=junit/test-results.xml --cov-report=xml tests - - name: Setup tmate session - if: ${{ failure() }} - uses: mxschmitt/action-tmate@v3 + # - name: Setup tmate session + # if: ${{ failure() }} + # uses: mxschmitt/action-tmate@v3 - name: Upload pytest test results uses: actions/upload-artifact@v1 diff --git a/README.md b/README.md index 47f9fc5..d871652 100644 --- a/README.md +++ b/README.md @@ -45,4 +45,8 @@ ddt.read_delta_history("delta_path",limit=5) # read delta history to delete the files ddt.vacuum("delta_path",dry_run=False) +# Can read from S3,azure,gcfs etc. +ddt.read_delta_table("s3://bucket_name/delta_path",version=3) +# please ensure the credentials are properly configured as environment variable or +# configured as in ~/.aws/credential ``` From c7d004c4c78b919ab875f596b896969d9d60ba99 Mon Sep 17 00:00:00 2001 From: rajagurunath Date: Tue, 28 Sep 2021 21:22:14 +0530 Subject: [PATCH 8/8] removed unwanted pkg import --- tests/test_core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 42dc1b7..c825caa 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,15 +1,11 @@ import glob import os import zipfile -from io import BytesIO import pytest -from dask.utils import homogeneous_deepmap import dask_deltatable as ddt -requests = pytest.importorskip("requests") - @pytest.fixture() def simple_table(tmpdir):