Have you ever wondered how to level up your data processing game? If you're transitioning from ad-hoc analytics and researching options, this might be a good starting point.
This project has two main modules:
local
which shows how to setup a simple data processing pipeline using Luigi, Python, Pandas and Postgres in no time. Though simple, this approach can get you pretty far.cloud
which illustrates how you can easily swap out:- local storage in favour of durable and distributed Google Cloud Storage,
- local processing power in favour of scalable Google Dataflow,
- local PostgreSQL database that you need to manage in favour of BigQuery which has a familiar SQL interface, but can process TBs of data without breaking a sweat and integrates nicely with GSuite accounts.
There is another module sampledata
which is used to generate sample data.
To make it a bit more interesting imagine that the data is from a car renting company called DailyCar.
Specifically, we have the following information (under sampledata/generated/
):
users.csv
has information about registered clients of DailyCar.cars.csv
has information about its car park.rents.csv
contains a list of rents, specifically, who and when rented what car.fines.csv
is pulled from police database, and help us see all the fines (like speed limit) that are related to company's cars.
Business would like to enrich information about fines, so it's able to understand who was driving a specific car at a particular point in time. More formally, we need to generate a table with the following fields (transposed):
column | data |
---|---|
fine_id | 1 |
fine_amount | 15 |
fine_registered_at | 2017-10-01 21:36:00 |
rent_id | 1 |
rented_on | 2017-10-01 |
car_id | 3 |
car_reg_number | ks2888 |
car_make | bmw |
car_model | series_2 |
user_id | 3 |
user_name | Dumitru Matei |
user_passport_no | 482850738 |
user_birth_date | 1966-06-22 |
user_driving_permit_since | 1991-10-18 |
We'll demonstrate how to build an ETL pipline around this problem under local
and cloud
modules.
Also, feel free to tune parameters in sampledata/generate.py
to get more or less data to work with.
First, make sure you have python 2.7
.
Then, inside project's root folder execute the following commands to install required packages:
$ pip install pipenv
$ pipenv install --skip-lock
For the local
part you need to install PostgreSQL and create a database and a user, like this:
> psql postgres
=# create role dwh login password 'dwh';
=# create database data_zero_to_cloud owner dwh;
For the cloud
part you need to obtain Google Cloud Service credentials and put them under config/credentials.json
.
Don't forget to update config/config.ini
accordingly.
To run an ETL task use the following command:
$ ./run-luigi.py --local-scheduler --module=MODULE_NAME TASK_NAME --on=DATE
Replace TASK_NAME
with the name of a defined task, like ProcessFines
.
DATE
parameter can take any value (for our purposes it doesn't matter much what value), for instance 2017-11-16
.
MODULE_NAME
can be either local
or cloud
.
For example:
$ ./run-luigi.py --local-scheduler --module=cloud ProcessFines --on=2017-11-16
If you want to go really wild, change runner
parameter in config.ini
to DataflowRunner
and unleash the full power of the cloud, as it will run Apache Beam tasks using Google Dataflow.
After you run a cloud
ETL, you may want to see the result.
If you have a Google Cloud account and your own credentials, feel free to go to the web console.
Otherwise, obtain workshop host's credentials and use a ./shell.py
script to load an iPython session with some predefined functions, such as gls
and gcat
.
An example usage is below:
In [5]: gls('2017-11-15')
Out[5]:
[<Blob: warehouse-in-gcs-store, 2017-11-15/cars.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/fines.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rents.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rich_fines/_SUCCESS>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rich_fines/data.csv-00000-of-00001>,
<Blob: warehouse-in-gcs-store, 2017-11-15/users.csv>]
In [6]: gcat('2017-11-15/cars.csv')
id,make,model,reg_number
1,nissan,murano,ko2116
2,hyundai,solaris,ct8988
3,bmw,series_2,ks2888
In [7]: gcat('2017-11-15/rich_fines/data.csv-00000-of-00001')
fine_id,fine_amount,fine_registered_at,rent_id,rented_on,car_id,car_reg_number,car_make,car_model,user_id,user_name,user_passport_no,user_birth_date,user_driving_permit_since
8,1,2017-10-03 09:09:00,7,2017-10-03,1,ko2116,nissan,murano,1,Cristina Ciobanu,547345952,1988-02-17,1991-02-27
...
Practice makes perfect, so if you'd like to go a little bit deeper, here are some ideas to try:
-
Task
local.LoadRichFines
will not replace contents of the table, which may not be desirable especially if you run your ETL several times a day. Try to implement a task that inherits fromluigi.contrib.postgres.CopyToTable
, and disregards whether it was run before or not. -
Similarly,
cloud.LoadRichFines
wont't replace a table in BigQuery. Try to fix this. -
There's a bit of a boilerplate in
cloud.ProcessFines
withMap
s andCoGroupBy
s. Try to implement a customJoin
transform that does SQL-style join on twoPCollection
s. Example usage is:((rich_rents, fines) | Join( left_on=lambda x: (x['car_reg_number'], x['rented_on']), right_on=lambda x: (x['car_reg_number'], x['registered_on'])))