Skip to content

Commit

Permalink
Merge pull request #3 from simonwoerpel/develop
Browse files Browse the repository at this point in the history
0.1.2
  • Loading branch information
simonwoerpel authored Apr 17, 2020
2 parents 51d9d96 + 55401f4 commit 9d96baa
Show file tree
Hide file tree
Showing 24 changed files with 262 additions and 84 deletions.
10 changes: 10 additions & 0 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[bumpversion]
current_version = 0.1.2
tag_name = {new_version}
commit = True
tag = True

[bumpversion:file:setup.py]
search = version='{current_version}'
replace = version='{new_version}'

13 changes: 13 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# changelog

## 0.1.2

- Add pagination by offset logic
- Allow lambda functions as strings in yaml for column transformation and df operations
- Add option in yaml to set cache header for google cloud storage blobs
- Small bugfixes


## 0.1.1

- first release with basic functionality
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ store this as a file, and set the env var `CONFIG` to the path:
```python
from runpandarun.datasets import my_dataset, another_dataset

df = my_dataset.df
df = my_dataset.get_df()
df['name'].plot.hist()

another_dataset.daily.mean().plot() # some handy shorthands for pandas
Expand All @@ -74,15 +74,15 @@ store = Datastore()
store.update()

# save a revision
df = my_dataset.df
df = my_dataset.get_df()
df = wrangle(df)
my_dataset.save(df, 'wrangled')

# get this revision (in another script)
df = my_dataset['wrangled']

# publish
df = my_dataset.df
df = my_dataset.get_df()
clean(df)
my_dataset.publish(df, name='cleaned', overwrite=True)
```
Expand Down Expand Up @@ -340,7 +340,7 @@ store = Datastore(config)
ds = store.my_dataset
# all your datasets have their computed (according to your config) `pandas.DataFrame` as attribute:
df = store.my_dataset.df
df = store.my_dataset.get_df()

# get combined df (if specified in the config)
df = store.combined
Expand Down Expand Up @@ -387,9 +387,9 @@ After the workflow is done, you can publish some (or all) results.

```python
dataset = store.my_dataset
df1 = do_something_with(dataset.df)
df1 = do_something_with(dataset.get_df())
dataset.publish(df1, overwrite=True, include_source=True)
df2 = do_something_else_with(dataset.df)
df2 = do_something_else_with(dataset.get_df())
dataset.publish(df2, name='filtered_for_europe', format='json')
```

Expand Down Expand Up @@ -451,7 +451,7 @@ for, right?

```python
ds = store.my_dataset
df = ds.df
df = ds.get_df()
ds.revisions.save('tansformed', df.T)
```

Expand Down
14 changes: 7 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ store this as a file, and set the env var ``CONFIG`` to the path:
from runpandarun.datasets import my_dataset, another_dataset
df = my_dataset.df
df = my_dataset.get_df()
df['name'].plot.hist()
another_dataset.daily.mean().plot() # some handy shorthands for pandas
Expand All @@ -81,15 +81,15 @@ Handle data persistence and state of datasets:
store.update()
# save a revision
df = my_dataset.df
df = my_dataset.get_df()
df = wrangle(df)
my_dataset.save(df, 'wrangled')
# get this revision (in another script)
df = my_dataset['wrangled']
# publish
df = my_dataset.df
df = my_dataset.get_df()
clean(df)
my_dataset.publish(df, name='cleaned', overwrite=True)
Expand Down Expand Up @@ -377,7 +377,7 @@ analysis scripts and focus on the analysis itself…
ds = store.my_dataset
# all your datasets have their computed (according to your config) `pandas.DataFrame` as attribute:
df = store.my_dataset.df
df = store.my_dataset.get_df()
# get combined df (if specified in the config)
df = store.combined
Expand Down Expand Up @@ -417,9 +417,9 @@ After the workflow is done, you can publish some (or all) results.
.. code:: python
dataset = store.my_dataset
df1 = do_something_with(dataset.df)
df1 = do_something_with(dataset.get_df())
dataset.publish(df1, overwrite=True, include_source=True)
df2 = do_something_else_with(dataset.df)
df2 = do_something_else_with(dataset.get_df())
dataset.publish(df2, name='filtered_for_europe', format='json')
For behaviour reasons the ``overwrite``-flag must be set explicitly
Expand Down Expand Up @@ -481,7 +481,7 @@ workflow is for, right?
.. code:: python
ds = store.my_dataset
df = ds.df
df = ds.get_df()
ds.revisions.save('tansformed', df.T)
**load a revision**
Expand Down
6 changes: 5 additions & 1 deletion example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ datasets:
- state
- date
dt_index: date
ops:
- applymap:
func: 'lambda x: x.lower() if isinstance(x, str) else x'
a_local_json:
json_local: ./example/testdata.json
copy: true
Expand All @@ -42,6 +45,7 @@ datasets:
integer: str
columns:
- value: integer
- state
- state:
map: 'lambda x: x.upper()'
- date
dt_index: date
30 changes: 30 additions & 0 deletions example/rki_json.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
storage:
filesystem:
enabled: true
data_root: datastore-testdata/example
datasets:
rki_json:
json_url: https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/RKI_COVID19/FeatureServer/0/query?where=1%3D1&outFields=*&outSR=4326&f=json
json_normalize:
record_path: features
paginate:
offset:
param: resultOffset
get_offset_value: 'lambda res: len(res.json()["features"])'
columns:
- date: attributes.Meldedatum
- last_updated: attributes.Datenstand
- report_id: attributes.ObjectId
- gender: attributes.Geschlecht
- age: attributes.Altersgruppe
- value: attributes.AnzahlFall
- death_value: attributes.AnzahlTodesfall
- new_case: attributes.NeuerFall
- new_death_case: attributes.NeuerTodesfall
- state_id: attributes.IdBundesland
- state: attributes.Bundesland
- district_id: attributes.IdLandkreis
- district: attributes.Landkreis
incremental: true
dt_index:
column: date
20 changes: 17 additions & 3 deletions runpandarun/columns.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
import banal

from .exceptions import ConfigError
from .util import safe_eval


def wrangle_columns(df, config):
use_columns = []
rename_columns = {}
map_funcs = {}
for column in config.columns:
if isinstance(column, str):
use_columns.append(column)
elif isinstance(column, dict):
elif banal.is_mapping(column):
if len(column) > 1:
raise ConfigError(f'Column config `{column}` has errors.')
target, source = list(column.items())[0]
use_columns.append(source)
rename_columns[source] = target
if banal.is_mapping(source):
source_column = source.get('column', target)
map_func = source.get('map')
if map_func:
map_funcs[target] = safe_eval(map_func)
else:
source_column = source
use_columns.append(source_column)
rename_columns[source_column] = target
else:
raise ConfigError(f'Column config `{column}` has errors.')

df = df[use_columns]
if rename_columns:
df = df.rename(columns=rename_columns)
if map_funcs:
for col, func in map_funcs.items():
df[col] = df[col].map(func)
return df
2 changes: 1 addition & 1 deletion runpandarun/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ def concat_long(dfs):


def concat_wide(datasets):
dfs = (ds.df.rename(columns={c: f'{ds.name}.{c}' for c in ds.df.columns}) for ds in datasets)
dfs = (ds._df.rename(columns={c: f'{ds.name}.{c}' for c in ds._df.columns}) for ds in datasets)
df = pd.concat(dfs, axis=1)
return df
14 changes: 6 additions & 8 deletions runpandarun/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def __init__(self, name, config, store):
self.config = Config({**DEFAULT_CONFIG, **config})
self.store = store
self._storage = DatasetStorage(name, config, store._storage)
self._base_df = None
self._df = None
self.revisions = DatasetRevisions(self)

# provide handy pd shortcuts
Expand All @@ -72,7 +70,7 @@ def __setitem__(self, name, item):
self.revisions.save(name, item)

@cached_property
def df(self):
def _df(self):
return self.get_df()

@cached_property
Expand Down Expand Up @@ -107,7 +105,7 @@ def load(self):

def publish(self, df=None, **kwargs):
if df is None:
df = self.df
df = self._df
config = self.config.update({'publish': self.store.config.publish or {}}) # FIXME hrmpf
return publish.publish(self, df, config, **kwargs)

Expand All @@ -130,12 +128,12 @@ def resample(self, interval, method):
if method not in RESAMPLE_METHODS.keys():
raise ConfigError(f'Resampling method `{method}` not valid.') # noqa
if method == 'count': # FIXME implementation?
df = self.df.copy()
df = self._df.copy()
df['count'] = 1
return df.resample(interval)[['count']].count()
return self.df[self.numeric_cols()].resample(interval).apply(RESAMPLE_METHODS[method])
return self._df[self.numeric_cols()].resample(interval).apply(RESAMPLE_METHODS[method])

def numeric_cols(self):
for col in self.df.columns:
if pd.api.types.is_numeric_dtype(self.df[col]):
for col in self._df.columns:
if pd.api.types.is_numeric_dtype(self._df[col]):
yield col
19 changes: 19 additions & 0 deletions runpandarun/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .exceptions import ConfigError
from .util import ensure_singlekey_dict, safe_eval


def paginate_offset(get_request, offset_param, get_offset_value, offset=0):
res = get_request(**{offset_param: offset})
new_offset = offset + get_offset_value(res)
if new_offset > offset:
yield res
yield from paginate_offset(get_request, offset_param, get_offset_value, new_offset)


def paginate(get_request, config):
method, config = ensure_singlekey_dict(config)
if method is None:
raise ConfigError(f'Please make sure {config} is properly configured as single-key dict!')
if method != 'offset':
raise ConfigError(f'Other pagination method than `{method}` currently not registered')
yield from paginate_offset(get_request, config['param'], safe_eval(config['get_offset_value']))
4 changes: 2 additions & 2 deletions runpandarun/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def _load_csv(source, config):


def load_csv(source, config):
if config.incremental:
if config.incremental or config.paginate:
return pd.concat(_load_csv(s, config.read or {}) for s in source)
return _load_csv(source, config.read or {})

Expand All @@ -25,6 +25,6 @@ def _load_json(source, config):


def load_json(source, config):
if config.incremental:
if config.incremental or config.paginate:
return pd.concat(_load_json(s, config) for s in source)
return _load_json(source, config)
3 changes: 2 additions & 1 deletion runpandarun/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pandas import DataFrame

from .exceptions import ConfigError
from .util import safe_eval


def apply_ops(df, ops):
Expand All @@ -17,7 +18,7 @@ def apply_ops(df, ops):
op_args = list(op.values())
if len(op_args) > 1:
raise ConfigError(f'Operation arguments not valid: {op_args} - should be only 1 mapping item.')
op_args = op_args[0]
op_args = {k: safe_eval(v) if k == 'func' else v for k, v in op_args[0].items()}
func = getattr(DataFrame, op_name, None)
if func is None or not callable(func):
raise ConfigError(f'{op} is not a valid opration for `pd.DataFrame`')
Expand Down
8 changes: 6 additions & 2 deletions runpandarun/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@
from datetime import datetime
from urllib.parse import urljoin

from .exceptions import ConfigError
from .ops import apply_ops
from .storage import get_backend


class Handler:
def __init__(self, dataset, df, config, backend):
self.enabled = banal.as_bool(config.enabled)
self.dataset = dataset
self.df = df
self.config = config
self.backend = backend
self.name = config.get('name', dataset.name)
self.format = config.get('format', dataset.format)
self.overwrite = config.get('overwrite')
self.with_timestamp = config.get('with_timestamp')
self.dump = getattr(df, f'to_{self.format}')
self.base_path = self.get_base_path()
df = apply_ops(df, config.get('clean', {}))
self.dump = getattr(df, f'to_{self.format}')

def get_base_path(self):
return self.backend.get_path(self.dataset.name)
Expand Down Expand Up @@ -52,6 +54,8 @@ def get_file_name(self):


def _publish(dataset, df, config, **kwargs):
if config.publish is None:
raise ConfigError('Add a publish handler config to be able to publish datasets.')
for handler, handler_config in config.publish['handlers'].items():
if banal.as_bool(handler_config.get('enabled', True)):
backend = get_backend(handler, handler_config)
Expand Down
9 changes: 6 additions & 3 deletions runpandarun/storage/gcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ def exists(self, path):

def store(self, path, content, publish=False):
ext = os.path.splitext(path)[1]
if ext == 'csv':
if ext == '.csv':
content_type = 'text/csv'
elif ext == 'json':
elif ext == '.json':
content_type = 'application/json'
else:
content_type = 'text/plain'
blob = self._get_blob(path)
blob.upload_from_string(content, content_type)
blob.upload_from_string(content, content_type=content_type)
if publish:
if self.config.cache_control:
blob.cache_control = self.config.cache_control
blob.patch()
blob.make_public()
return blob.public_url
return self.get_path(path)
Expand Down
Loading

0 comments on commit 9d96baa

Please sign in to comment.