diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6ed8061..55fb567 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -42,6 +42,10 @@ jobs: - name: Run tests run: python -m pytest --junitxml=junit/test-results.xml --cov-report=xml tests + # - name: Setup tmate session + # if: ${{ failure() }} + # uses: mxschmitt/action-tmate@v3 + - name: Upload pytest test results uses: actions/upload-artifact@v1 with: diff --git a/README.md b/README.md index 651f976..d871652 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ To Try out the package: pip install dask-deltatable ``` -Features: +### Features: 1. Reads the parquet files based on delta logs parallely using dask engine 2. Supports all three filesystem like s3, azurefs, gcsfs 3. Supports some delta features like @@ -17,3 +17,36 @@ Features: - parquet filters - row filter - partition filter +4. Query Delta commit info - History +5. vacuum the old/ unused parquet files +6. load different versions of data using datetime. + +### Usage: + +``` +import dask_deltatable as ddt + +# read delta table +ddt.read_delta_table("delta_path") + +# read delta table for specific version +ddt.read_delta_table("delta_path",version=3) + +# read delta table for specific datetime +ddt.read_delta_table("delta_path",datetime="2018-12-19T16:39:57-08:00") + + +# read delta complete history +ddt.read_delta_history("delta_path") + +# read delta history upto given limit +ddt.read_delta_history("delta_path",limit=5) + +# read delta history to delete the files +ddt.vacuum("delta_path",dry_run=False) + +# Can read from S3,azure,gcfs etc. +ddt.read_delta_table("s3://bucket_name/delta_path",version=3) +# please ensure the credentials are properly configured as environment variable or +# configured as in ~/.aws/credential +``` diff --git a/dask_deltatable/__init__.py b/dask_deltatable/__init__.py index 6f2e377..0d3f019 100644 --- a/dask_deltatable/__init__.py +++ b/dask_deltatable/__init__.py @@ -1 +1 @@ -from .core import read_delta_table +from .core import read_delta_history, read_delta_table, vacuum diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index ee142bb..6aa2f0b 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -1,196 +1,33 @@ import json -import re +import os +from urllib.parse import urlparse +import dask import pyarrow.parquet as pq from dask.base import tokenize from dask.dataframe.io import from_delayed -from dask.dataframe.io.parquet.core import get_engine from dask.delayed import delayed +from deltalake import DeltaTable from fsspec.core import get_fs_token_paths from pyarrow import dataset as pa_ds -from .utils import ( - FASTPARQUET_CHECKPOINT_SCHEMA, - PYARROW_CHECKPOINT_SCHEMA, - schema_from_string, -) - -__all__ = ["read_delta_table"] - - -class DeltaTable: - """ - Core DeltaTable Read Algorithm - - Parameters - ---------- - path: str - path of Delta table directory - version: int, default 0 - DeltaTable Version, used for Time Travelling across the - different versions of the parquet datasets - columns: None or list(str) - Columns to load. If None, loads all. - engine : str, default 'auto' - Parquet reader library to use. Options include: 'auto', 'fastparquet', - 'pyarrow', 'pyarrow-dataset', and 'pyarrow-legacy'. Defaults to 'auto', - which selects the FastParquetEngine if fastparquet is installed (and - ArrowDatasetEngine otherwise). If 'pyarrow' or 'pyarrow-dataset' is - specified, the ArrowDatasetEngine (which leverages the pyarrow.dataset - API) will be used. If 'pyarrow-legacy' is specified, ArrowLegacyEngine - will be used (which leverages the pyarrow.parquet.ParquetDataset API). - NOTE: The 'pyarrow-legacy' option (ArrowLegacyEngine) is deprecated - for pyarrow>=5. - checkpoint: int, default None - DeltaTable protocol aggregates the json delta logs and creates parquet - checkpoint for every 10 versions.This will save some IO. - for example: - if you want to read 106th version of dataset, No need to read all - the all the 106 json files (which isIO heavy), instead read 100th - parquet checkpoint which contains all the logs of 100 versions in single - parquet file and now read the extra 6 json files to match the given - 106th version. - (so IO reduced from 106 to 7) - storage_options : dict, default None - Key/value pairs to be passed on to the file-system backend, if any. - """ +class DeltaTableWrapper(object): def __init__( - self, - path: str, - version: int = 0, - columns=None, - engine="auto", - checkpoint=None, - storage_options=None, - ): - self.path = str(path).rstrip("/") + self, path, version, columns, datetime=None, storage_options=None + ) -> None: + self.path = path self.version = version - self.pq_files = set() - self.delta_log_path = f"{self.path}/_delta_log" - self.fs, self.fs_token, _ = get_fs_token_paths( - path, storage_options=storage_options - ) - self.engine = engine self.columns = columns - self.checkpoint = ( - checkpoint if checkpoint is not None else self.get_checkpoint_id() - ) + self.datetime = datetime self.storage_options = storage_options - self.schema = None - - def get_checkpoint_id(self): - """ - if _last_checkpoint file exists, returns checkpoint_id else zero - """ - try: - last_checkpoint_version = json.loads( - self.fs.cat(f"{self.delta_log_path}/_last_checkpoint") - )["version"] - except FileNotFoundError: - last_checkpoint_version = 0 - return last_checkpoint_version - - def get_pq_files_from_checkpoint_parquet(self): - """ - use checkpoint_id to get logs from parquet files - """ - if self.checkpoint == 0: - return - checkpoint_path = ( - f"{self.delta_log_path}/{self.checkpoint:020}.checkpoint.parquet" - ) - if not self.fs.exists(checkpoint_path): - raise ValueError( - f"Parquet file with the given checkpoint {self.checkpoint} does not exists: " - f"File {checkpoint_path} not found" - ) - - # reason for this `if condition` was that FastParquetEngine seems to normalizes the json present in each column - # not sure whether there is a better solution for handling this . - # `https://fastparquet.readthedocs.io/en/latest/details.html#reading-nested-schema` - if self.engine.__name__ == "ArrowDatasetEngine": - parquet_checkpoint = self.engine.read_partition( - fs=self.fs, - pieces=[(checkpoint_path, None, None)], - columns=PYARROW_CHECKPOINT_SCHEMA, - index=None, - ) - for i, row in parquet_checkpoint.iterrows(): - if row["metaData"] is not None: - # get latest schema/columns , since delta Lake supports Schema Evolution. - self.schema = schema_from_string(row["metaData"]["schemaString"]) - elif row["add"] is not None: - self.pq_files.add(f"{self.path}/{row['add']['path']}") - - elif self.engine.__name__ == "FastParquetEngine": - parquet_checkpoint = self.engine.read_partition( - fs=self.fs, - pieces=[(checkpoint_path, None, None)], - columns=FASTPARQUET_CHECKPOINT_SCHEMA, - index=None, - ) - for i, row in parquet_checkpoint.iterrows(): - if row["metaData.schemaString"]: - self.schema = schema_from_string(row["metaData.schemaString"]) - elif row["add.path"]: - self.pq_files.add(f"{self.path}/{row['add.path']}") - - def get_pq_files_from_delta_json_logs(self): - """ - start from checkpoint id, collect logs from every json file until the - given version - example: - checkpoint 10, version 16 - 1. read the logs from 10th checkpoint parquet ( using above func) - 2. read logs from json files until version 16 - log Collection: - for reading the particular version of delta table, We are concerned - about `add` and `remove` Operation (transaction) only.(which involves - adding and removing respective parquet file transaction) - """ - log_files = self.fs.glob( - f"{self.delta_log_path}/{self.checkpoint // 10:019}*.json" + self.dt = DeltaTable(table_uri=self.path, version=self.version) + self.fs, self.fs_token, _ = get_fs_token_paths( + path, storage_options=storage_options ) - if len(log_files) == 0: - raise RuntimeError( - f"No Json files found at _delta_log_path:- {self.delta_log_path}" - ) - log_files = sorted(log_files) - log_versions = [ - int(re.findall(r"(\d{20})", log_file_name)[0]) - for log_file_name in log_files - ] - if (self.version is not None) and (self.version not in log_versions): - raise ValueError( - f"Cannot time travel Delta table to version {self.version}, Available versions for given " - f"checkpoint {self.checkpoint} are {log_versions}" - ) - for log_file_name, log_version in zip(log_files, log_versions): - log = self.fs.cat(log_file_name).decode().split("\n") - for line in log: - if line: # for last empty line - meta_data = json.loads(line) - if "add" in meta_data.keys(): - file = f"{self.path}/{meta_data['add']['path']}" - self.pq_files.add(file) - - elif "remove" in meta_data.keys(): - remove_file = f"{self.path}/{meta_data['remove']['path']}" - if remove_file in self.pq_files: - self.pq_files.remove(remove_file) - elif "metaData" in meta_data.keys(): - schema_string = meta_data["metaData"]["schemaString"] - self.schema = schema_from_string(schema_string) - - if self.version == int(log_version): - break + self.schema = self.dt.pyarrow_schema() def read_delta_dataset(self, f, **kwargs): - """ - Function to read single parquet file using pyarrow dataset (to support - schema evolution) - """ schema = kwargs.pop("schema", None) or self.schema filter = kwargs.pop("filter", None) if filter: @@ -220,31 +57,106 @@ def _make_meta_from_schema(self): meta[field.name] = field.type.to_pandas_dtype() return meta + def _history_helper(self, log_file_name): + log = self.fs.cat(log_file_name).decode().split("\n") + for line in log: + if line: + meta_data = json.loads(line) + if "commitInfo" in meta_data: + return meta_data["commitInfo"] + + def history(self, limit=None, **kwargs): + delta_log_path = str(self.path).rstrip("/") + "/_delta_log" + log_files = self.fs.glob(f"{delta_log_path}/*.json") + if len(log_files) == 0: # pragma no cover + raise RuntimeError(f"No History (logs) found at:- {delta_log_path}/") + log_files = sorted(log_files, reverse=True) + if limit is None: + last_n_files = log_files + else: + last_n_files = log_files[:limit] + parts = [ + delayed( + self._history_helper, + name="read-delta-history" + tokenize(self.fs_token, f, **kwargs), + )(f, **kwargs) + for f in list(last_n_files) + ] + return dask.compute(parts)[0] + + def _vacuum_helper(self, filename_to_delete): + full_path = urlparse(self.path) + if full_path.scheme and full_path.netloc: # pragma no cover + # for different storage backend, delta-rs vacuum gives path to the file + # it will not provide bucket name and scheme s3 or gcfs etc. so adding + # manually + filename_to_delete = ( + f"{full_path.scheme}://{full_path.netloc}/{filename_to_delete}" + ) + self.fs.rm_file(filename_to_delete) + + def vacuum(self, retention_hours=168, dry_run=True): + """ + Run the Vacuum command on the Delta Table: list and delete files no + longer referenced by the Delta table and are older than the + retention threshold. + + retention_hours: the retention threshold in hours, if none then + the value from `configuration.deletedFileRetentionDuration` is used + or default of 1 week otherwise. + dry_run: when activated, list only the files, delete otherwise + + Returns + ------- + the list of files no longer referenced by the Delta Table and are + older than the retention threshold. + """ + + tombstones = self.dt.vacuum(retention_hours=retention_hours) + if dry_run: + return tombstones + else: + parts = [ + delayed( + self._vacuum_helper, + name="delta-vacuum" + + tokenize(self.fs_token, f, retention_hours, dry_run), + )(f) + for f in tombstones + ] + dask.compute(parts)[0] + + def get_pq_files(self): + """ + get the list of parquet files after loading the + current datetime version + """ + __doc__ == self.dt.load_with_datetime.__doc__ + + if self.datetime is not None: + self.dt.load_with_datetime(self.datetime) + return self.dt.file_uris() + def read_delta_table(self, **kwargs): """ Reads the list of parquet files in parallel """ - if len(self.pq_files) == 0: + pq_files = self.get_pq_files() + if len(pq_files) == 0: raise RuntimeError("No Parquet files are available") parts = [ delayed( self.read_delta_dataset, name="read-delta-table-" + tokenize(self.fs_token, f, **kwargs), )(f, **kwargs) - for f in list(self.pq_files) + for f in list(pq_files) ] meta = self._make_meta_from_schema() return from_delayed(parts, meta=meta) def read_delta_table( - path, - version=None, - columns=None, - engine="auto", - checkpoint=None, - storage_options=None, - **kwargs, + path, version=None, columns=None, storage_options=None, datetime=None, **kwargs, ): """ Read a Delta Table into a Dask DataFrame @@ -256,31 +168,22 @@ def read_delta_table( ---------- path: str path of Delta table directory - version: int, default 0 + version: int, default None DeltaTable Version, used for Time Travelling across the different versions of the parquet datasets + datetime: str, default None + Time travel Delta table to the latest version that's created at or + before provided `datetime_string` argument. + The `datetime_string` argument should be an RFC 3339 and ISO 8601 date + and time string. + + Examples: + `2018-01-26T18:30:09Z` + `2018-12-19T16:39:57-08:00` + `2018-01-26T18:30:09.453+00:00` + #(copied from delta-rs docs) columns: None or list(str) Columns to load. If None, loads all. - engine : str, default 'auto' - Parquet reader library to use. Options include: 'auto', 'fastparquet', - 'pyarrow', 'pyarrow-dataset', and 'pyarrow-legacy'. Defaults to 'auto', - which selects the FastParquetEngine if fastparquet is installed (and - ArrowDatasetEngine otherwise). If 'pyarrow' or 'pyarrow-dataset' is - specified, the ArrowDatasetEngine (which leverages the pyarrow.dataset - API) will be used. If 'pyarrow-legacy' is specified, ArrowLegacyEngine - will be used (which leverages the pyarrow.parquet.ParquetDataset API). - NOTE: The 'pyarrow-legacy' option (ArrowLegacyEngine) is deprecated - for pyarrow>=5. - checkpoint: int, default None - DeltaTable protocol aggregates the json delta logs and creates parquet - checkpoint for every 10 versions.This will save some IO. - for example: - if you want to read 106th version of dataset, No need to read all - the all the 106 json files (which isIO heavy), instead read 100th - parquet checkpoint which contains all the logs of 100 versions in single - parquet file and now read the extra 6 json files to match the given - 106th version. - (so IO reduced from 106 to 7) storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. kwargs: dict,optional @@ -290,9 +193,10 @@ def read_delta_table( schema : pyarrow.Schema Used to maintain schema evolution in deltatable. - delta protocol stores the schema string in the json log files which is converted - into pyarrow.Schema and used for schema evolution (refer delta/utils.py). - i.e Based on particular version, some columns can be shown or not shown. + delta protocol stores the schema string in the json log files which is + converted into pyarrow.Schema and used for schema evolution + i.e Based on particular version, some columns can be + shown or not shown. filter: Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. @@ -310,16 +214,63 @@ def read_delta_table( >>> df = dd.read_delta_table('s3://bucket/my-delta-table') # doctest: +SKIP """ - engine = get_engine(engine) - dt = DeltaTable( + dtw = DeltaTableWrapper( path=path, version=version, - checkpoint=checkpoint, columns=columns, - engine=engine, storage_options=storage_options, + datetime=datetime, + ) + return dtw.read_delta_table(columns=columns, **kwargs) + + +def read_delta_history(path, limit=None, storage_options=None): + """ + Run the history command on the DeltaTable. + The operations are returned in reverse chronological order. + + Parallely reads delta log json files using dask delayed and gathers the + list of commit_info (history) + + Parameters + ---------- + path: str + path of Delta table directory + limit: int, default None + the commit info limit to return, defaults to return all history + + Returns + ------- + list of the commit infos registered in the transaction log + """ + + dtw = DeltaTableWrapper( + path=path, version=None, columns=None, storage_options=storage_options + ) + return dtw.history(limit=limit) + + +def vacuum(path, retention_hours=168, dry_run=True, storage_options=None): + """ + Run the Vacuum command on the Delta Table: list and delete + files no longer referenced by the Delta table and are + older than the retention threshold. + + retention_hours: int, default 168 + the retention threshold in hours, if none then the value + from `configuration.deletedFileRetentionDuration` is used + or default of 1 week otherwise. + dry_run: bool, default True + when activated, list only the files, delete otherwise + + Returns + ------- + None or List of tombstones + i.e the list of files no longer referenced by the Delta Table + and are older than the retention threshold. + """ + + dtw = DeltaTableWrapper( + path=path, version=None, columns=None, storage_options=storage_options ) - # last_checkpoint_version = dt.get_checkpoint_id() - dt.get_pq_files_from_checkpoint_parquet() - dt.get_pq_files_from_delta_json_logs() - return dt.read_delta_table(columns=columns, **kwargs) + return dtw.vacuum(retention_hours=retention_hours, dry_run=dry_run) diff --git a/dask_deltatable/utils.py b/dask_deltatable/utils.py deleted file mode 100644 index c8f263e..0000000 --- a/dask_deltatable/utils.py +++ /dev/null @@ -1,135 +0,0 @@ -import json -from typing import Union - -import pyarrow as pa - -PYARROW_CHECKPOINT_SCHEMA = [ - "txn", - "add", - "remove", - "metaData", - "protocol", - "commitInfo", -] -FASTPARQUET_CHECKPOINT_SCHEMA = [ - "txn.appId", - "txn.version", - "txn.lastUpdated", - "add.path", - "add.partitionValues", - "add.size", - "add.modificationTime", - "add.dataChange", - "add.stats", - "add.tags", - "remove.path", - "remove.deletionTimestamp", - "remove.dataChange", - "metaData.id", - "metaData.name", - "metaData.description", - "metaData.format.provider", - "metaData.format.options", - "metaData.schemaString", - "metaData.partitionColumns", - "metaData.configuration", - "metaData.createdTime", - "protocol.minReaderVersion", - "protocol.minWriterVersion", - "commitInfo.version", - "commitInfo.timestamp", - "commitInfo.userId", - "commitInfo.userName", - "commitInfo.operation", - "commitInfo.operationParameters", - "commitInfo.job.jobId", - "commitInfo.job.jobName", - "commitInfo.job.runId", - "commitInfo.job.jobOwnerId", - "commitInfo.job.triggerType", - "commitInfo.notebook.notebookId", - "commitInfo.clusterId", - "commitInfo.readVersion", - "commitInfo.isolationLevel", - "commitInfo.isBlindAppend", - "commitInfo.operationMetrics", - "commitInfo.userMetadata", -] - - -def schema_from_string(schema_string: str): - - fields = [] - schema = json.loads(schema_string) - for field in schema["fields"]: - name = field["name"] - type = field["type"] - nullable = field["nullable"] - metadata = field["metadata"] - pa_type = map_type(type) - - fields.append(pa.field(name, pa_type, nullable=nullable, metadata=metadata)) - return pa.schema(fields) - - -def map_type(input_type: Union[dict, str]): - - simple_type_mapping = { - "byte": pa.int8(), - "short": pa.int16(), - "integer": pa.int32(), - "long": pa.int64(), - "float": pa.float32(), - "double": pa.float64(), - "string": pa.string(), - "boolean": pa.bool_(), - "binary": pa.binary(), - "date": pa.date32(), - "timestamp": pa.timestamp("ns"), - } - - # If type is string, it should be a "simple" datatype - if isinstance(input_type, str): - - # map simple data types, that can be directly converted - if input_type in simple_type_mapping: - pa_type = simple_type_mapping[input_type] - else: - raise TypeError( - f"Got type unsupported {input_type} when trying to parse schema" - ) - - # nested field needs special handling - else: - if input_type["type"] == "array": - # map list type to pyarrow types - element_type = map_type(input_type["elementType"]) - # pass a field as the type to the list with a name of "element". - # This is just to comply with the way pyarrow creates lists when infering schemas - pa_field = pa.field("element", element_type) - pa_type = pa.list_(pa_field) - - elif input_type["type"] == "map": - key_type = map_type(input_type["keyType"]) - item_type = map_type(input_type["valueType"]) - pa_type = pa.map_(key_type, item_type) - - elif input_type["type"] == "struct": - fields = [] - for field in input_type["fields"]: - name = field["name"] - input_type = field["type"] - nullable = field["nullable"] - metadata = field["metadata"] - field_type = map_type(input_type) - - fields.append( - pa.field(name, field_type, nullable=nullable, metadata=metadata) - ) - pa_type = pa.struct(fields) - - else: - raise TypeError( - f"Got type unsupported {input_type} when trying to parse schema" - ) - return pa_type diff --git a/requirements.txt b/requirements.txt index 5091375..b095396 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ dask[dataframe,distributed] pyarrow +deltalake diff --git a/tests/data/simple2.zip b/tests/data/simple2.zip new file mode 100644 index 0000000..3f6aa35 Binary files /dev/null and b/tests/data/simple2.zip differ diff --git a/tests/data/vacuum.zip b/tests/data/vacuum.zip new file mode 100644 index 0000000..fbb5589 Binary files /dev/null and b/tests/data/vacuum.zip differ diff --git a/tests/test_core.py b/tests/test_core.py index 37d9d13..c825caa 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,19 +1,26 @@ +import glob +import os import zipfile -from io import BytesIO import pytest import dask_deltatable as ddt -requests = pytest.importorskip("requests") - @pytest.fixture() def simple_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/simple.zip") deltaf.extractall(output_dir) - return output_dir + "/test1/" + return str(output_dir) + "/test1/" + + +@pytest.fixture() +def simple_table2(tmpdir): + output_dir = tmpdir + deltaf = zipfile.ZipFile("tests/data/simple2.zip") + deltaf.extractall(output_dir) + return str(output_dir) + "/simple_table/" @pytest.fixture() @@ -21,7 +28,7 @@ def partition_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/partition.zip") deltaf.extractall(output_dir) - return output_dir + "/test2/" + return str(output_dir) + "/test2/" @pytest.fixture() @@ -29,7 +36,7 @@ def empty_table1(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/empty1.zip") deltaf.extractall(output_dir) - return output_dir + "/empty/" + return str(output_dir) + "/empty/" @pytest.fixture() @@ -37,7 +44,7 @@ def empty_table2(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/empty2.zip") deltaf.extractall(output_dir) - return output_dir + "/empty2/" + return str(output_dir) + "/empty2/" @pytest.fixture() @@ -45,7 +52,15 @@ def checkpoint_table(tmpdir): output_dir = tmpdir deltaf = zipfile.ZipFile("tests/data/checkpoint.zip") deltaf.extractall(output_dir) - return output_dir + "/checkpoint/" + return str(output_dir) + "/checkpoint/" + + +@pytest.fixture() +def vacuum_table(tmpdir): + output_dir = tmpdir + deltaf = zipfile.ZipFile("tests/data/vacuum.zip") + deltaf.extractall(output_dir) + return str(output_dir) + "/vaccum_table" def test_read_delta(simple_table): @@ -56,6 +71,7 @@ def test_read_delta(simple_table): def test_read_delta_with_different_versions(simple_table): + print(simple_table) df = ddt.read_delta_table(simple_table, version=0) assert df.compute().shape == (100, 3) @@ -117,7 +133,7 @@ def test_checkpoint(checkpoint_table): df = ddt.read_delta_table(checkpoint_table, checkpoint=20, version=22) assert df.compute().shape[0] == 115 - with pytest.raises(ValueError): + with pytest.raises(Exception): # Parquet file with the given checkpoint 30 does not exists: # File {checkpoint_path} not found" _ = ddt.read_delta_table(checkpoint_table, checkpoint=30, version=33) @@ -126,5 +142,80 @@ def test_checkpoint(checkpoint_table): def test_out_of_version_error(simple_table): # Cannot time travel Delta table to version 4 , Available versions for given # checkpoint 0 are [0,1] - with pytest.raises(ValueError): + with pytest.raises(Exception): _ = ddt.read_delta_table(simple_table, version=4) + + +def test_load_with_datetime(simple_table2): + log_dir = f"{simple_table2}_delta_log" + log_mtime_pair = [ + ("00000000000000000000.json", 1588398451.0), + ("00000000000000000001.json", 1588484851.0), + ("00000000000000000002.json", 1588571251.0), + ("00000000000000000003.json", 1588657651.0), + ("00000000000000000004.json", 1588744051.0), + ] + for file_name, dt_epoch in log_mtime_pair: + file_path = os.path.join(log_dir, file_name) + os.utime(file_path, (dt_epoch, dt_epoch)) + + expected = ddt.read_delta_table(simple_table2, version=0).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-01T00:47:31-07:00" + ).compute() + assert expected.equals(result) + # assert_frame_equal(expected,result) + + expected = ddt.read_delta_table(simple_table2, version=1).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-02T22:47:31-07:00" + ).compute() + assert expected.equals(result) + + expected = ddt.read_delta_table(simple_table2, version=4).compute() + result = ddt.read_delta_table( + simple_table2, datetime="2020-05-25T22:47:31-07:00" + ).compute() + assert expected.equals(result) + + +def test_read_history(checkpoint_table): + history = ddt.read_delta_history(checkpoint_table) + assert len(history) == 26 + + last_commit_info = history[0] + last_commit_info == { + "timestamp": 1630942389906, + "operation": "WRITE", + "operationParameters": {"mode": "Append", "partitionBy": "[]"}, + "readVersion": 24, + "isBlindAppend": True, + "operationMetrics": { + "numFiles": "6", + "numOutputBytes": "5147", + "numOutputRows": "5", + }, + } + + # check whether the logs are sorted + current_timestamp = history[0]["timestamp"] + for h in history[1:]: + assert current_timestamp > h["timestamp"], "History Not Sorted" + current_timestamp = h["timestamp"] + + history = ddt.read_delta_history(checkpoint_table, limit=5) + assert len(history) == 5 + + +def test_vacuum(vacuum_table): + print(vacuum_table) + print(os.listdir(vacuum_table)) + tombstones = ddt.vacuum(vacuum_table, dry_run=True) + print(tombstones) + assert len(tombstones) == 4 + + before_pq_files_len = len(glob.glob(f"{vacuum_table}/*.parquet")) + assert before_pq_files_len == 7 + tombstones = ddt.vacuum(vacuum_table, dry_run=False) + after_pq_files_len = len(glob.glob(f"{vacuum_table}/*.parquet")) + assert after_pq_files_len == 3