diff --git a/.bumpversion.cfg b/.bumpversion.cfg new file mode 100644 index 0000000..78a52d9 --- /dev/null +++ b/.bumpversion.cfg @@ -0,0 +1,10 @@ +[bumpversion] +current_version = 0.1.2 +tag_name = {new_version} +commit = True +tag = True + +[bumpversion:file:setup.py] +search = version='{current_version}' +replace = version='{new_version}' + diff --git a/CHANGES.md b/CHANGES.md new file mode 100644 index 0000000..461cc47 --- /dev/null +++ b/CHANGES.md @@ -0,0 +1,13 @@ +# changelog + +## 0.1.2 + +- Add pagination by offset logic +- Allow lambda functions as strings in yaml for column transformation and df operations +- Add option in yaml to set cache header for google cloud storage blobs +- Small bugfixes + + +## 0.1.1 + +- first release with basic functionality diff --git a/README.md b/README.md index 3c5d6c7..4dde645 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ store this as a file, and set the env var `CONFIG` to the path: ```python from runpandarun.datasets import my_dataset, another_dataset -df = my_dataset.df +df = my_dataset.get_df() df['name'].plot.hist() another_dataset.daily.mean().plot() # some handy shorthands for pandas @@ -74,7 +74,7 @@ store = Datastore() store.update() # save a revision -df = my_dataset.df +df = my_dataset.get_df() df = wrangle(df) my_dataset.save(df, 'wrangled') @@ -82,7 +82,7 @@ my_dataset.save(df, 'wrangled') df = my_dataset['wrangled'] # publish -df = my_dataset.df +df = my_dataset.get_df() clean(df) my_dataset.publish(df, name='cleaned', overwrite=True) ``` @@ -340,7 +340,7 @@ store = Datastore(config) ds = store.my_dataset # all your datasets have their computed (according to your config) `pandas.DataFrame` as attribute: -df = store.my_dataset.df +df = store.my_dataset.get_df() # get combined df (if specified in the config) df = store.combined @@ -387,9 +387,9 @@ After the workflow is done, you can publish some (or all) results. ```python dataset = store.my_dataset -df1 = do_something_with(dataset.df) +df1 = do_something_with(dataset.get_df()) dataset.publish(df1, overwrite=True, include_source=True) -df2 = do_something_else_with(dataset.df) +df2 = do_something_else_with(dataset.get_df()) dataset.publish(df2, name='filtered_for_europe', format='json') ``` @@ -451,7 +451,7 @@ for, right? ```python ds = store.my_dataset -df = ds.df +df = ds.get_df() ds.revisions.save('tansformed', df.T) ``` diff --git a/README.rst b/README.rst index 989b4a8..ac229fd 100644 --- a/README.rst +++ b/README.rst @@ -60,7 +60,7 @@ store this as a file, and set the env var ``CONFIG`` to the path: from runpandarun.datasets import my_dataset, another_dataset - df = my_dataset.df + df = my_dataset.get_df() df['name'].plot.hist() another_dataset.daily.mean().plot() # some handy shorthands for pandas @@ -81,7 +81,7 @@ Handle data persistence and state of datasets: store.update() # save a revision - df = my_dataset.df + df = my_dataset.get_df() df = wrangle(df) my_dataset.save(df, 'wrangled') @@ -89,7 +89,7 @@ Handle data persistence and state of datasets: df = my_dataset['wrangled'] # publish - df = my_dataset.df + df = my_dataset.get_df() clean(df) my_dataset.publish(df, name='cleaned', overwrite=True) @@ -377,7 +377,7 @@ analysis scripts and focus on the analysis itself… ds = store.my_dataset # all your datasets have their computed (according to your config) `pandas.DataFrame` as attribute: - df = store.my_dataset.df + df = store.my_dataset.get_df() # get combined df (if specified in the config) df = store.combined @@ -417,9 +417,9 @@ After the workflow is done, you can publish some (or all) results. .. code:: python dataset = store.my_dataset - df1 = do_something_with(dataset.df) + df1 = do_something_with(dataset.get_df()) dataset.publish(df1, overwrite=True, include_source=True) - df2 = do_something_else_with(dataset.df) + df2 = do_something_else_with(dataset.get_df()) dataset.publish(df2, name='filtered_for_europe', format='json') For behaviour reasons the ``overwrite``-flag must be set explicitly @@ -481,7 +481,7 @@ workflow is for, right? .. code:: python ds = store.my_dataset - df = ds.df + df = ds.get_df() ds.revisions.save('tansformed', df.T) **load a revision** diff --git a/example/config.yml b/example/config.yml index 2252f43..0a8f962 100644 --- a/example/config.yml +++ b/example/config.yml @@ -34,6 +34,9 @@ datasets: - state - date dt_index: date + ops: + - applymap: + func: 'lambda x: x.lower() if isinstance(x, str) else x' a_local_json: json_local: ./example/testdata.json copy: true @@ -42,6 +45,7 @@ datasets: integer: str columns: - value: integer - - state + - state: + map: 'lambda x: x.upper()' - date dt_index: date diff --git a/example/rki_json.yml b/example/rki_json.yml new file mode 100644 index 0000000..9326c7e --- /dev/null +++ b/example/rki_json.yml @@ -0,0 +1,30 @@ +storage: + filesystem: + enabled: true + data_root: datastore-testdata/example +datasets: + rki_json: + json_url: https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/RKI_COVID19/FeatureServer/0/query?where=1%3D1&outFields=*&outSR=4326&f=json + json_normalize: + record_path: features + paginate: + offset: + param: resultOffset + get_offset_value: 'lambda res: len(res.json()["features"])' + columns: + - date: attributes.Meldedatum + - last_updated: attributes.Datenstand + - report_id: attributes.ObjectId + - gender: attributes.Geschlecht + - age: attributes.Altersgruppe + - value: attributes.AnzahlFall + - death_value: attributes.AnzahlTodesfall + - new_case: attributes.NeuerFall + - new_death_case: attributes.NeuerTodesfall + - state_id: attributes.IdBundesland + - state: attributes.Bundesland + - district_id: attributes.IdLandkreis + - district: attributes.Landkreis + incremental: true + dt_index: + column: date diff --git a/runpandarun/columns.py b/runpandarun/columns.py index 74280e3..6e3eae2 100644 --- a/runpandarun/columns.py +++ b/runpandarun/columns.py @@ -1,22 +1,36 @@ +import banal + from .exceptions import ConfigError +from .util import safe_eval def wrangle_columns(df, config): use_columns = [] rename_columns = {} + map_funcs = {} for column in config.columns: if isinstance(column, str): use_columns.append(column) - elif isinstance(column, dict): + elif banal.is_mapping(column): if len(column) > 1: raise ConfigError(f'Column config `{column}` has errors.') target, source = list(column.items())[0] - use_columns.append(source) - rename_columns[source] = target + if banal.is_mapping(source): + source_column = source.get('column', target) + map_func = source.get('map') + if map_func: + map_funcs[target] = safe_eval(map_func) + else: + source_column = source + use_columns.append(source_column) + rename_columns[source_column] = target else: raise ConfigError(f'Column config `{column}` has errors.') df = df[use_columns] if rename_columns: df = df.rename(columns=rename_columns) + if map_funcs: + for col, func in map_funcs.items(): + df[col] = df[col].map(func) return df diff --git a/runpandarun/combine.py b/runpandarun/combine.py index 8ef896a..4bdebf6 100644 --- a/runpandarun/combine.py +++ b/runpandarun/combine.py @@ -23,6 +23,6 @@ def concat_long(dfs): def concat_wide(datasets): - dfs = (ds.df.rename(columns={c: f'{ds.name}.{c}' for c in ds.df.columns}) for ds in datasets) + dfs = (ds._df.rename(columns={c: f'{ds.name}.{c}' for c in ds._df.columns}) for ds in datasets) df = pd.concat(dfs, axis=1) return df diff --git a/runpandarun/dataset.py b/runpandarun/dataset.py index 4b522fe..0e9273c 100644 --- a/runpandarun/dataset.py +++ b/runpandarun/dataset.py @@ -50,8 +50,6 @@ def __init__(self, name, config, store): self.config = Config({**DEFAULT_CONFIG, **config}) self.store = store self._storage = DatasetStorage(name, config, store._storage) - self._base_df = None - self._df = None self.revisions = DatasetRevisions(self) # provide handy pd shortcuts @@ -72,7 +70,7 @@ def __setitem__(self, name, item): self.revisions.save(name, item) @cached_property - def df(self): + def _df(self): return self.get_df() @cached_property @@ -107,7 +105,7 @@ def load(self): def publish(self, df=None, **kwargs): if df is None: - df = self.df + df = self._df config = self.config.update({'publish': self.store.config.publish or {}}) # FIXME hrmpf return publish.publish(self, df, config, **kwargs) @@ -130,12 +128,12 @@ def resample(self, interval, method): if method not in RESAMPLE_METHODS.keys(): raise ConfigError(f'Resampling method `{method}` not valid.') # noqa if method == 'count': # FIXME implementation? - df = self.df.copy() + df = self._df.copy() df['count'] = 1 return df.resample(interval)[['count']].count() - return self.df[self.numeric_cols()].resample(interval).apply(RESAMPLE_METHODS[method]) + return self._df[self.numeric_cols()].resample(interval).apply(RESAMPLE_METHODS[method]) def numeric_cols(self): - for col in self.df.columns: - if pd.api.types.is_numeric_dtype(self.df[col]): + for col in self._df.columns: + if pd.api.types.is_numeric_dtype(self._df[col]): yield col diff --git a/runpandarun/fetch.py b/runpandarun/fetch.py new file mode 100644 index 0000000..ec77cb9 --- /dev/null +++ b/runpandarun/fetch.py @@ -0,0 +1,19 @@ +from .exceptions import ConfigError +from .util import ensure_singlekey_dict, safe_eval + + +def paginate_offset(get_request, offset_param, get_offset_value, offset=0): + res = get_request(**{offset_param: offset}) + new_offset = offset + get_offset_value(res) + if new_offset > offset: + yield res + yield from paginate_offset(get_request, offset_param, get_offset_value, new_offset) + + +def paginate(get_request, config): + method, config = ensure_singlekey_dict(config) + if method is None: + raise ConfigError(f'Please make sure {config} is properly configured as single-key dict!') + if method != 'offset': + raise ConfigError(f'Other pagination method than `{method}` currently not registered') + yield from paginate_offset(get_request, config['param'], safe_eval(config['get_offset_value'])) diff --git a/runpandarun/load.py b/runpandarun/load.py index 4022559..0b75757 100644 --- a/runpandarun/load.py +++ b/runpandarun/load.py @@ -10,7 +10,7 @@ def _load_csv(source, config): def load_csv(source, config): - if config.incremental: + if config.incremental or config.paginate: return pd.concat(_load_csv(s, config.read or {}) for s in source) return _load_csv(source, config.read or {}) @@ -25,6 +25,6 @@ def _load_json(source, config): def load_json(source, config): - if config.incremental: + if config.incremental or config.paginate: return pd.concat(_load_json(s, config) for s in source) return _load_json(source, config) diff --git a/runpandarun/ops.py b/runpandarun/ops.py index 68759f7..c0bbc28 100644 --- a/runpandarun/ops.py +++ b/runpandarun/ops.py @@ -2,6 +2,7 @@ from pandas import DataFrame from .exceptions import ConfigError +from .util import safe_eval def apply_ops(df, ops): @@ -17,7 +18,7 @@ def apply_ops(df, ops): op_args = list(op.values()) if len(op_args) > 1: raise ConfigError(f'Operation arguments not valid: {op_args} - should be only 1 mapping item.') - op_args = op_args[0] + op_args = {k: safe_eval(v) if k == 'func' else v for k, v in op_args[0].items()} func = getattr(DataFrame, op_name, None) if func is None or not callable(func): raise ConfigError(f'{op} is not a valid opration for `pd.DataFrame`') diff --git a/runpandarun/publish.py b/runpandarun/publish.py index fc686ee..74603d9 100644 --- a/runpandarun/publish.py +++ b/runpandarun/publish.py @@ -3,6 +3,8 @@ from datetime import datetime from urllib.parse import urljoin +from .exceptions import ConfigError +from .ops import apply_ops from .storage import get_backend @@ -10,15 +12,15 @@ class Handler: def __init__(self, dataset, df, config, backend): self.enabled = banal.as_bool(config.enabled) self.dataset = dataset - self.df = df self.config = config self.backend = backend self.name = config.get('name', dataset.name) self.format = config.get('format', dataset.format) self.overwrite = config.get('overwrite') self.with_timestamp = config.get('with_timestamp') - self.dump = getattr(df, f'to_{self.format}') self.base_path = self.get_base_path() + df = apply_ops(df, config.get('clean', {})) + self.dump = getattr(df, f'to_{self.format}') def get_base_path(self): return self.backend.get_path(self.dataset.name) @@ -52,6 +54,8 @@ def get_file_name(self): def _publish(dataset, df, config, **kwargs): + if config.publish is None: + raise ConfigError('Add a publish handler config to be able to publish datasets.') for handler, handler_config in config.publish['handlers'].items(): if banal.as_bool(handler_config.get('enabled', True)): backend = get_backend(handler, handler_config) diff --git a/runpandarun/storage/gcloud.py b/runpandarun/storage/gcloud.py index 1179fe9..f3b3466 100644 --- a/runpandarun/storage/gcloud.py +++ b/runpandarun/storage/gcloud.py @@ -43,15 +43,18 @@ def exists(self, path): def store(self, path, content, publish=False): ext = os.path.splitext(path)[1] - if ext == 'csv': + if ext == '.csv': content_type = 'text/csv' - elif ext == 'json': + elif ext == '.json': content_type = 'application/json' else: content_type = 'text/plain' blob = self._get_blob(path) - blob.upload_from_string(content, content_type) + blob.upload_from_string(content, content_type=content_type) if publish: + if self.config.cache_control: + blob.cache_control = self.config.cache_control + blob.patch() blob.make_public() return blob.public_url return self.get_path(path) diff --git a/runpandarun/storage/storage.py b/runpandarun/storage/storage.py index f5b0433..fd3dd91 100644 --- a/runpandarun/storage/storage.py +++ b/runpandarun/storage/storage.py @@ -7,6 +7,7 @@ from ..config import Config from ..exceptions import FetchError +from ..fetch import paginate from ..util import cached_property, make_key @@ -73,7 +74,7 @@ def get_source(self, update=False, version='newest'): versions = self.backend.get_children(self._fp('data')) versions = sorted([v for _, v in versions]) - if self.config.incremental is True: + if self.config.incremental or self.config.paginate: # concat all the versions return self.get_incremental_sources(versions) @@ -103,21 +104,16 @@ def fetch(self, store=True): """fetch a dataset source and store it on disk""" content = self.get_remote_content() if content: - if store: # still only store if newer file is different - key = make_key(content, hash=True) - last_key = self.backend.get_value(self._fp('last_update_key')) - if last_key != key: - ts = datetime.utcnow().isoformat() - fp = 'data/data.%s.%s' % (ts, self.format) - self.backend.store(self._fp(fp), content) - self.backend.set_value(self._fp('last_update_key'), key) - self.set_ts('last_update') - self.storage.set_ts('last_update') - return + if store: + if self.config.paginate: + return self.store_paginated(content) + return self.store(content) raise FetchError(f'Could not fetch source data for dataset `{self.name}`.') def get_remote_content(self): if self.is_remote: + if self.config.paginate: + return paginate(self.get_request, self.config.paginate) res = self.get_request() if res.ok: return res.text @@ -143,12 +139,34 @@ def should_store(self): if self.is_local: return banal.as_bool(self.config.copy) - def get_request(self): + def get_request(self, **params): url = self.url - params = self.config.get('request').get('params') + params = {**self.config.get('request').get('params', {}), **params} headers = self.config.get('request').get('headers') return requests.get(url, params=params, headers=headers) + def store(self, content, page=None): + # still only store if newer file is different + key_name = 'last_update_key' + fp = 'data/data.%s.%s' + if page is not None: + key_name += f'--{page}' + fp = f'data/data--{page}.%s.%s' + last_key = self.backend.get_value(self._fp(key_name)) + + key = make_key(content, hash=True) + if last_key != key: + ts = datetime.utcnow().isoformat() + fp = fp % (ts, self.format) + self.backend.store(self._fp(fp), content) + self.backend.set_value(self._fp(key_name), key) + self.set_ts('last_update') + self.storage.set_ts('last_update') + + def store_paginated(self, results): + for page, res in enumerate(results): + self.store(res.text, page) + @cached_property def is_csv(self): return any((self.config.get('csv_url'), self.config.get('csv_local'))) diff --git a/runpandarun/store.py b/runpandarun/store.py index 1af6db0..072e73f 100644 --- a/runpandarun/store.py +++ b/runpandarun/store.py @@ -68,7 +68,7 @@ def combined(self): - merge on index """ datasets = [ds for ds in self if ds.name in self._combine] - dfs = [ds.df for ds in datasets] + dfs = [ds._df for ds in datasets] # concat long if combine.test_index_name_equal(dfs) and combine.test_columns_equal(dfs): return combine.concat_long(dfs) diff --git a/runpandarun/util.py b/runpandarun/util.py index 348a344..152bfd8 100644 --- a/runpandarun/util.py +++ b/runpandarun/util.py @@ -1,6 +1,9 @@ +import banal import os import sys import hashlib +import pandas as pd +import numpy as np from multiprocessing import Pool, cpu_count from slugify import slugify as _slugify @@ -158,3 +161,31 @@ def make_key(*criteria, hash=None, clean=False): return key m.update(key.encode('utf-8')) return m.hexdigest() + + +def safe_eval(value): + return eval(value, {'__builtins__': { + 'pd': pd, + 'np': np, + 'str': str, + 'int': int, + 'float': float, + 'dict': dict, + 'list': list, + 'tuple': tuple, + 'None': None, + 'True': True, + 'False': False, + 'len': len, + 'hasattr': hasattr, + 'getattr': getattr, + 'isinstance': isinstance + }}) + + +def ensure_singlekey_dict(data): + # validate that data is a dict with only 1 key and return key, data[key] + if banal.is_mapping(data): + if len(data) == 1: + return list(data.items())[0] + return None, None diff --git a/setup.py b/setup.py index 8193f80..1299109 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def readme(): setup( name='runpandarun', - version='0.1.1', + version='0.1.2', description='A simple toolkit for managing data from different sources.', long_description=readme(), classifiers=[ diff --git a/tests/test_datastore.py b/tests/test_datastore.py index 5d8f84d..c0b68f0 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -44,19 +44,19 @@ def test_datasets(self): self.assertIsInstance(dataset, Dataset) def test_df(self): - df = self.store.datasets[0].df + df = self.store.datasets[0].get_df() self.assertIsInstance(df, pd.DataFrame) self.assertEqual('id', df.index.name) def test_json(self): ds = self.store.a_local_json self.assertTrue(ds.config.dt_index) - df = ds.df + df = ds.get_df() self.assertIsInstance(df, pd.DataFrame) self.assertEqual('date', df.index.name) def test_dtindex(self): - df = self.store.a_local_csv.df + df = self.store.a_local_csv.get_df() self.assertIsInstance(df.index, pd.DatetimeIndex) def test_resampling(self): @@ -77,8 +77,8 @@ def test_resampling(self): self.assertEqual(list(df.columns), ['count']) def test_combine_long(self): - df1 = self.store.a_local_csv.df - df2 = self.store.a_local_json.df + df1 = self.store.a_local_csv.get_df() + df2 = self.store.a_local_json.get_df() combined = self.store.combined self.assertSetEqual(set(combined.columns), set(df1.columns)) self.assertEqual(len(df1) + len(df2), len(combined)) @@ -111,9 +111,9 @@ def test_combine_wide(self): """ store = Datastore(config) store.update() - df1 = store.a_local_csv.df + df1 = store.a_local_csv.get_df() df1 = df1.rename(columns={c: f'a_local_csv.{c}' for c in df1.columns}) - df2 = store.same_but_different.df + df2 = store.same_but_different.get_df() df2 = df2.rename(columns={c: f'same_but_different.{c}' for c in df2.columns}) combined = store.combined self.assertEqual(len(df1), len(combined)) @@ -142,19 +142,19 @@ def test_incremental(self): store = Datastore(config) ds = store.my_dataset self.assertTrue(ds.config.incremental) - items = len(ds.df) + items = len(ds.get_df()) ds = ds.update() - self.assertGreater(len(ds.df), items) - self.assertEqual(len(ds.df), items*2) + self.assertGreater(len(ds.get_df()), items) + self.assertEqual(len(ds.get_df()), items*2) config = store.config.to_dict() del config['datasets']['my_dataset']['ops'] # enable default ops with drop_duplicates store = Datastore(config) ds = store.my_dataset self.assertTrue(ds.config.incremental) - items = len(ds.df) + items = len(ds.get_df()) ds = ds.update() - self.assertEqual(len(ds.df), items) + self.assertEqual(len(ds.get_df()), items) def test_ops(self): config = """ @@ -163,25 +163,37 @@ def test_ops(self): data_root: datastore-testdata/test_incremental datasets: my_dataset: - csv_url: https://docs.google.com/spreadsheets/d/e/2PACX-1vRhzhiVJr0XPcMANnb9_F7bcE6h-C5826MGJs034AocLpyo4uy0y97LIG2ns8F1heCrSTsyEkL1XwDK/pub?output=csv # noqa - columns: - - id: identifier - - value - - date + csv_local: ./example/testdata.csv + dt_index: date """ store = Datastore(config) ds = store.datasets[0] self.assertIsInstance(ds.config.ops, list) # base ops config = store.config.to_dict() config['datasets']['my_dataset']['ops'] = [ - {'sort_values': {'ascending': False, 'by': 'value'}}, - {'fillna': {'value': ''}} + {'sort_values': {'ascending': False, 'by': 'state'}}, + {'fillna': {'value': ''}}, + {'applymap': {'func': 'lambda x: x.lower() if isinstance(x, str) else x'}} + ] + store = Datastore(config) + ds = store.datasets[0] + df = ds.get_df() + self.assertTrue(all(df['state'].map(lambda x: x.islower()))) + + # unsafe eval raise + config['datasets']['my_dataset']['ops'] = [ + {'applymap': {'func': "__import__('os').system('rm -rf /tmp/still-dont-be-too-risky-in-this-test')"}} ] store = Datastore(config) ds = store.datasets[0] - ds.df + self.assertRaises(NameError, ds.get_df) def test_json_dtype(self): store = self.store - df = store.a_local_json.df + df = store.a_local_json.get_df() self.assertTrue(df['value'].dtype.name == 'object') + + def test_columns_map(self): + ds = self.store.a_local_json + df = ds.get_df() + self.assertTrue(all(df['state'].map(lambda x: x.isupper()))) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 0cde738..73de2bb 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -19,3 +19,13 @@ def test_dedup_update_by_hash(self): # update again will not store the source, bc it didn't change: ds.update() self.assertListEqual(ds._storage.backend.get_children(f'{ds.name}/data'), raw_files) + + def test_paginate_by_offset(self): + # FIXME this test might fail over time bc. the remote dataset might change / removed at one point + store = Datastore('./example/rki_json.yml') + ds = store.rki_json + ds.update() + raw_files = ds._storage.backend.get_children(f'{ds.name}/data') + self.assertGreater(len(raw_files), 1) + # just make sure nothing breaks + df = ds.get_df() # noqa diff --git a/tests/test_gcloud.py b/tests/test_gcloud.py index 93d99f8..9d93ac4 100644 --- a/tests/test_gcloud.py +++ b/tests/test_gcloud.py @@ -4,6 +4,7 @@ import banal import os +import requests import unittest import pandas as pd @@ -33,12 +34,12 @@ def tearDown(self): os.environ['GOOGLE_PUBLISH_ENABLED'] = '0' os.environ['GOOGLE_ENABLED'] = '0' # delete created google buckets - # for bucket in (self.publish_config['bucket'], self.storage_config['bucket']): - # try: - # bucket = self.client.get_bucket(bucket) - # bucket.delete() - # except NotFound: - # pass + for bucket in (self.publish_config['bucket'], self.storage_config['bucket']): + try: + bucket = self.client.get_bucket(bucket) + bucket.delete(force=True) + except NotFound: + pass def test_1config(self): self.assertTrue(banal.as_bool(self.publish_config['enabled'])) @@ -78,3 +79,13 @@ def test_gcloud_publish(self): res = ds.publish(overwrite=True) self.assertEqual(url, res[0]) + + def test_gcloud_cache(self): + config = self.store.config.to_dict().copy() + config['publish']['handlers']['gcloud']['cache_control'] = 'no-cache' + store = Datastore(self.store.config.update(config)) + dataset = store.datasets[0] + dataset.publish() + url = 'https://runpandarun-testbucket-publish.storage.googleapis.com/my_dataset/my_dataset.csv' + res = requests.get(url) + self.assertEqual(res.headers['cache-control'], 'no-cache') diff --git a/tests/test_json.py b/tests/test_json.py index 3934b5b..ae50cce 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -15,10 +15,10 @@ def test_json(self): ds = self.dataset data = self.data self.assertDictEqual(data, json.loads(ds._storage.get_source())) - self.assertEqual(len(data['features']), len(ds.df)) + self.assertEqual(len(data['features']), len(ds.get_df())) def test_remote_json(self): # TODO this test will fail sooner or later... store = Datastore('./example/rki_remote.yml') ds = store.rki - self.assertEqual(len(self.data['features']), len(ds.df)) + self.assertEqual(len(self.data['features']), len(ds.get_df())) diff --git a/tests/test_publish.py b/tests/test_publish.py index f980b3f..6ca12e6 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -19,7 +19,7 @@ def test_filesystem_publish(self): self.assertIn(self.config['data_root'], fp) self.assertTrue(os.path.isfile(fp)) df = pd.read_csv(fp, index_col='id') - self.assertTrue(df.equals(ds.df)) + self.assertTrue(df.equals(ds.get_df())) # overwrite self.assertRaises(FileExistsError, ds.publish) @@ -35,7 +35,7 @@ def test_filesystem_publish(self): self.assertEqual(ds._storage.get_source(), source_content) # different name and wrangled df - df = ds.df.T + df = ds.get_df().T fp = ds.publish(df, name='transformed')[0] self.assertTrue(os.path.isfile(fp)) self.assertIn('transformed', fp) diff --git a/tests/test_revisions.py b/tests/test_revisions.py index ff643f6..56dbd3e 100644 --- a/tests/test_revisions.py +++ b/tests/test_revisions.py @@ -17,14 +17,14 @@ def _fp(self, name): def test_revisions(self): ds = self.ds self.assertIsInstance(ds.revisions, DatasetRevisions) - ds.revisions.save('transformed', ds.df.T) + ds.revisions.save('transformed', ds.get_df().T) self.assertIn('transformed', ds.revisions) self.assertIn('transformed', ds.revisions.list()) self.assertTrue(os.path.isfile(self._fp('transformed'))) self.assertTrue(ds['transformed'].equals(ds.revisions['transformed'])) rev = ds['transformed'] - self.assertTrue(rev.equals(ds.df.T)) + self.assertTrue(rev.equals(ds.get_df().T)) # store other stuff (anything that pickle can handle) now = datetime.now()