diff --git a/README.md b/README.md index 136a9e19..378e17c4 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ These demos run under binder and can be found at: * [Spark and Iceberg](https://mybinder.org/v2/gh/projectnessie/nessie-demos/main?labpath=notebooks%2Fnessie-iceberg-demo-nba.ipynb) -* [Spark and Delta](https://mybinder.org/v2/gh/projectnessie/nessie-demos/main?labpath=notebooks%2Fnessie-delta-demo-nba.ipynb) * [Flink and Iceberg](https://mybinder.org/v2/gh/projectnessie/nessie-demos/main?labpath=notebooks%2Fnessie-iceberg-flink-demo-nba.ipynb) * [Hive and Iceberg](https://mybinder.org/v2/gh/projectnessie/nessie-demos/main?labpath=notebooks%2Fnessie-iceberg-hive-demo-nba.ipynb) @@ -24,12 +23,6 @@ Nessie version is set in Binder at `docker/binder/requirements_base.txt`. Curren Currently we are using Iceberg `0.13.1` and it is specified in both iceberg notebooks as well as `docker/utils/__init__.py` -### Delta - -Currently, the Delta version is taken directly from the Nessie version and isn't explicitly noted. It is currently `1.1.0-nessie` - -See https://github.com/projectnessie/nessie/blob/nessie-0.30.0/pom.xml#L171 - ### Spark Only has to be updated in `docker/binder/requirements.txt`. Currently, Iceberg supports 3.0, 3.1 and 3.2 while deltalake supports diff --git a/notebooks/nessie-delta-demo-nba.ipynb b/notebooks/nessie-delta-demo-nba.ipynb deleted file mode 100644 index 6d8f5e2e..00000000 --- a/notebooks/nessie-delta-demo-nba.ipynb +++ /dev/null @@ -1,1666 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Nessie Spark SQL Demo with NBA Dataset\n", - "============================\n", - "This demo showcases how to use Nessie Python API along with Spark3 from Delta Lake\n", - "\n", - "Initialize Pyspark\n", - "----------------------------------------------\n", - "To get started, we will first have to do a few setup steps that give us everything we need\n", - "to get started with Nessie. In case you're interested in the detailed setup steps for Spark, you can check out the [docs](https://projectnessie.org/tools/deltalake/spark/)\n", - "\n", - "The Binder server has downloaded spark and some data for us as well as started a Nessie server in the background. All we have to do is start Spark\n", - "\n", - "The below cell starts a local Spark session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "WARNING: An illegal reflective access operation has occurred\n", - "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/jovyan/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", - "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", - "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", - "WARNING: All illegal access operations will be denied in a future release\n", - "https://storage.googleapis.com/nessie-maven added as a remote repository with the name: repo-1\n", - "Ivy Default Cache set to: /home/jovyan/.ivy2/cache\n", - "The jars for the packages stored in: /home/jovyan/.ivy2/jars\n", - "org.projectnessie#nessie-deltalake added as a dependency\n", - "org.projectnessie#nessie-spark-3.2-extensions added as a dependency\n", - ":: resolving dependencies :: org.apache.spark#spark-submit-parent-2ab7f1e0-01bb-42fd-bb2f-6c1b59cdc6dd;1.0\n", - "\tconfs: [default]\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - ":: loading settings :: url = jar:file:/home/jovyan/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\tfound org.projectnessie#nessie-deltalake;0.30.0 in central\n", - "\tfound org.antlr#antlr4-runtime;4.9.2 in central\n", - "\tfound org.projectnessie#nessie-spark-3.2-extensions;0.30.0 in central\n", - "downloading https://repo1.maven.org/maven2/org/projectnessie/nessie-deltalake/0.30.0/nessie-deltalake-0.30.0.jar ...\n", - "\t[SUCCESSFUL ] org.projectnessie#nessie-deltalake;0.30.0!nessie-deltalake.jar (375ms)\n", - "downloading https://repo1.maven.org/maven2/org/projectnessie/nessie-spark-3.2-extensions/0.30.0/nessie-spark-3.2-extensions-0.30.0.jar ...\n", - "\t[SUCCESSFUL ] org.projectnessie#nessie-spark-3.2-extensions;0.30.0!nessie-spark-3.2-extensions.jar (87ms)\n", - "downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.2/antlr4-runtime-4.9.2.jar ...\n", - "\t[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.2!antlr4-runtime.jar (59ms)\n", - ":: resolution report :: resolve 19292ms :: artifacts dl 524ms\n", - "\t:: modules in use:\n", - "\torg.antlr#antlr4-runtime;4.9.2 from central in [default]\n", - "\torg.projectnessie#nessie-deltalake;0.30.0 from central in [default]\n", - "\torg.projectnessie#nessie-spark-3.2-extensions;0.30.0 from central in [default]\n", - "\t---------------------------------------------------------------------\n", - "\t| | modules || artifacts |\n", - "\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n", - "\t---------------------------------------------------------------------\n", - "\t| default | 3 | 3 | 3 | 0 || 3 | 3 |\n", - "\t---------------------------------------------------------------------\n", - ":: retrieving :: org.apache.spark#spark-submit-parent-2ab7f1e0-01bb-42fd-bb2f-6c1b59cdc6dd\n", - "\tconfs: [default]\n", - "\t3 artifacts copied, 0 already retrieved (4023kB/6ms)\n", - "22/05/24 07:49:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", - "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", - "Setting default log level to \"WARN\".\n", - "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Spark Running\n" - ] - } - ], - "source": [ - "import os\n", - "import findspark\n", - "from pyspark.sql import *\n", - "from pyspark import SparkConf\n", - "import pynessie\n", - "\n", - "findspark.init()\n", - "pynessie_version = pynessie.__version__\n", - "\n", - "warehouse = \"file://\" + os.getcwd() + \"/spark_warehouse/delta\"\n", - "conf = SparkConf()\n", - "# we add our custom fork of delta to the known repositories\n", - "conf.set(\"spark.jars.repositories\", \"https://storage.googleapis.com/nessie-maven\")\n", - "# we need delta libraries and the nessie sql extensions\n", - "conf.set(\n", - " \"spark.jars.packages\",\n", - " f\"org.projectnessie:nessie-deltalake:{pynessie_version},org.projectnessie:nessie-spark-3.2-extensions:{pynessie_version}\",\n", - ")\n", - "# ensure python <-> java interactions are w/ pyarrow\n", - "conf.set(\"spark.sql.execution.pyarrow.enabled\", \"true\")\n", - "# create catalog dev_catalog as a Delta catalog\n", - "conf.set(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n", - "# set the location for Nessie catalog to store data. Spark writes to this directory\n", - "conf.set(\"spark.sql.catalog.spark_catalog.warehouse\", warehouse)\n", - "# set the location of the nessie server. In this demo its running locally. There are many ways to run it (see https://projectnessie.org/try/)\n", - "conf.set(\"spark.sql.catalog.spark_catalog.uri\", \"http://localhost:19120/api/v1\")\n", - "# default branch for Nessie catalog to work on\n", - "conf.set(\"spark.sql.catalog.spark_catalog.ref\", \"main\")\n", - "# use no authorization. Options are NONE AWS BASIC and aws implies running Nessie on a lambda\n", - "conf.set(\"spark.sql.catalog.spark_catalog.auth_type\", \"NONE\")\n", - "# These two lines tell Delta to use Nessie as the internal storage handler thereby enabling Delta/Nessie integraton\n", - "conf.set(\"spark.delta.logFileHandler.class\", \"org.projectnessie.deltalake.NessieLogFileMetaParser\")\n", - "conf.set(\"spark.delta.logStore.class\", \"org.projectnessie.deltalake.NessieLogStore\")\n", - "# enable the extensions for both Nessie and Delta\n", - "conf.set(\n", - " \"spark.sql.extensions\",\n", - " \"io.delta.sql.DeltaSparkSessionExtension,org.projectnessie.spark.extensions.NessieSpark32SessionExtensions\",\n", - ")\n", - "# finally, start up the Spark server\n", - "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n", - "print(\"Spark Running\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Solving Data Engineering problems with Nessie\n", - "============================\n", - "\n", - "In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.\n", - "\n", - "We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Set up Nessie branches\n", - "----------------------------\n", - "Once all dependencies are configured, we can get started with ingesting our basketball data into `Nessie` with the following steps:\n", - "\n", - "- Create a new branch named `dev`\n", - "- List all branches\n", - "\n", - "It is worth mentioning that we don't have to explicitly create a `main` branch, since it's the default branch." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchdev2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch dev 2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"CREATE BRANCH dev FROM main\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We have created the branch `dev` and we can see the branch with the Nessie `hash` its currently pointing to.\n", - "\n", - "Below we list all branches. Note that the auto created `main` branch already exists and both branches point at the same empty `hash` initially" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchdev2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a...
1Branchmain2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch dev 2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a...\n", - "1 Branch main 2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"LIST REFERENCES\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create tables under dev branch\n", - "-------------------------------------\n", - "Once we created the `dev` branch and verified that it exists, we can create some tables and add some data.\n", - "\n", - "We create two tables under the `dev` branch:\n", - "- `salaries`\n", - "- `totals_stats`\n", - "\n", - "These tables list the salaries per player per year and their stats per year.\n", - "\n", - "To create the data we:\n", - "\n", - "1. switch our branch context to dev\n", - "2. create the table\n", - "3. insert the data from an existing csv file. This csv file is already stored locally on the demo machine. A production use case would likely take feeds from official data sources" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"USE REFERENCE dev\")\n", - "\n", - "# Creating `salaries` table\n", - "spark.sql(\n", - " \"\"\"CREATE TABLE IF NOT EXISTS salaries (Season STRING, Team STRING, Salary STRING, Player STRING)\n", - " USING delta LOCATION '{}/salaries'\"\"\".format(\n", - " warehouse\n", - " )\n", - ")\n", - "\n", - "spark.sql(\n", - " \"\"\"CREATE OR REPLACE TEMPORARY VIEW salaries_table USING csv\n", - " OPTIONS (path \"../datasets/nba/salaries.csv\", header true)\"\"\"\n", - ")\n", - "spark.sql(\"INSERT INTO salaries SELECT * FROM salaries_table\")\n", - "\n", - "# Creating `totals_stats` table\n", - "spark.sql(\n", - " \"\"\"CREATE TABLE IF NOT EXISTS totals_stats (\n", - " Season STRING, Age STRING, Team STRING, ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING,\n", - " BLK STRING, TOV STRING, PTS STRING, Player STRING, RSorPO STRING)\n", - " USING delta LOCATION '{}/totals_stats'\"\"\".format(\n", - " warehouse\n", - " )\n", - ")\n", - "spark.sql(\n", - " \"\"\"CREATE OR REPLACE TEMPORARY VIEW stats_table USING csv\n", - " OPTIONS (path \"../datasets/nba/totals_stats.csv\", header true)\"\"\"\n", - ")\n", - "spark.sql(\"INSERT INTO totals_stats SELECT * FROM stats_table\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now we count the rows in our tables to ensure they are the same number as the csv files." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false - }, - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "50\n", - "92\n" - ] - } - ], - "source": [ - "table_count = spark.sql(\"select count(*) from salaries\").toPandas().values[0][0]\n", - "csv_count = spark.sql(\"select count(*) from salaries_table\").toPandas().values[0][0]\n", - "assert table_count == csv_count\n", - "print(table_count)\n", - "\n", - "table_count = spark.sql(\"select count(*) from totals_stats\").toPandas().values[0][0]\n", - "csv_count = spark.sql(\"select count(*) from stats_table\").toPandas().values[0][0]\n", - "assert table_count == csv_count\n", - "print(table_count)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Check generated tables\n", - "----------------------------\n", - "Since we have been working solely on the `dev` branch, where we created 2 tables and added some data,\n", - "let's verify that the `main` branch was not altered by our changes.\n", - "\n", - "Note: `SHOW TABLES` does not work on Delta because the Delta Catalog has no concept of references. We have to use the command line instead.\n", - "In this demo we are switching the reference around regularly which means `SHOW TABLES` isn't always reliable. In the situation where\n", - "your Spark job is only using one reference we can safely call `SHOW TABLES`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false - }, - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n" - ] - } - ], - "source": [ - "!nessie content list" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "And on the `dev` branch we expect to see two tables" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list --ref dev" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "We can also verify that the `dev` and `main` branches point to different commits" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchdev54622696d1313cfcb012120083d917f65558f0906f73ab...
1Branchmain2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch dev 54622696d1313cfcb012120083d917f65558f0906f73ab...\n", - "1 Branch main 2e1cfa82b035c26cbbbdae632cea070514eb8b773f616a..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"LIST REFERENCES\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Dev promotion into main\n", - "-----------------------\n", - "Once we are done with our changes on the `dev` branch, we would like to merge those changes into `main`.\n", - "We merge `dev` into `main` via the Spark sql `merge` command.\n", - "Both branches should be at the same revision after merging/promotion." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
namehash
0maindee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921...
\n", - "
" - ], - "text/plain": [ - " name hash\n", - "0 main dee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"MERGE BRANCH dev\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can verify that the `main` branch now contains the expected tables and row counts.\n", - "\n", - "The tables are now on `main` and ready for consumption by our blog authors and analysts!" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchmaindee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921...
1Branchdev54622696d1313cfcb012120083d917f65558f0906f73ab...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch main dee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921...\n", - "1 Branch dev 54622696d1313cfcb012120083d917f65558f0906f73ab..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"LIST REFERENCES\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "50\n", - "92\n" - ] - } - ], - "source": [ - "spark.sql(\"USE REFERENCE main\")\n", - "table_count = spark.sql(\"select count(*) from salaries\").toPandas().values[0][0]\n", - "csv_count = spark.sql(\"select count(*) from salaries_table\").toPandas().values[0][0]\n", - "assert table_count == csv_count\n", - "print(table_count)\n", - "\n", - "table_count = spark.sql(\"select count(*) from totals_stats\").toPandas().values[0][0]\n", - "csv_count = spark.sql(\"select count(*) from stats_table\").toPandas().values[0][0]\n", - "assert table_count == csv_count\n", - "print(table_count)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "Perform regular ETL on the new tables\n", - "-------------------\n", - "Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:\n", - "\n", - "1. Update the salaries table to add new data\n", - "2. We add `Years` column to `totals_stats` table to show how many years the player was in the league\n", - "3. We create a new table to hold information about the players appearances in all star games\n", - "\n", - "As always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchetldee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch etl dee6e3ec017cd39a272ccd4599bc2f1d4679731bd7b921..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"CREATE BRANCH etl FROM main\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# add some salaries for Kevin Durant\n", - "spark.sql(\"USE REFERENCE etl\")\n", - "spark.sql(\n", - " \"\"\"INSERT INTO salaries VALUES\n", - " (\"2017-18\", \"Golden State Warriors\", \"$25000000\", \"Kevin Durant\"),\n", - " (\"2018-19\", \"Golden State Warriors\", \"$30000000\", \"Kevin Durant\"),\n", - " (\"2019-20\", \"Brooklyn Nets\", \"$37199000\", \"Kevin Durant\"),\n", - " (\"2020-21\", \"Brooklyn Nets\", \"$39058950\", \"Kevin Durant\")\n", - " \"\"\"\n", - ").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Adding a column in the `totals_stats` table\n", - "spark.sql(\"ALTER TABLE totals_stats ADD COLUMNS (Years STRING)\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
count(1)
047
\n", - "
" - ], - "text/plain": [ - " count(1)\n", - "0 47" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Creating `allstar_games_stats` table and viewing the contents\n", - "spark.sql(\n", - " \"\"\"CREATE TABLE IF NOT EXISTS allstar_games_stats (\n", - " Season STRING, Age STRING, Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING,\n", - " TOV STRING, PF STRING, PTS STRING, Player STRING)\n", - " USING delta LOCATION '{}/allstar_stats'\"\"\".format(\n", - " warehouse\n", - " )\n", - ")\n", - "spark.sql(\n", - " \"\"\"CREATE OR REPLACE TEMPORARY VIEW allstar_table USING csv\n", - " OPTIONS (path \"../datasets/nba/allstar_games_stats.csv\", header true)\"\"\"\n", - ")\n", - "spark.sql(\"INSERT INTO allstar_games_stats SELECT * FROM allstar_table\").toPandas()\n", - "\n", - "# notice how we view the data on the etl branch via @etl\n", - "spark.sql(\"select count(*) from allstar_games_stats\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can verify that the new table isn't on the `main` branch but is present on the etl branch" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.allstar_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list --ref etl" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now that we are happy with the data we can again merge it into `main`" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false - }, - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
namehash
0maincdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9...
\n", - "
" - ], - "text/plain": [ - " name hash\n", - "0 main cdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"MERGE BRANCH etl\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now lets verify that the changes exist on the `main` branch" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false - }, - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.allstar_stats._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchmaincdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9...
1Branchetl2d11823828ee539d7609e1a88083ada6f37d39362a4e3a...
2Branchdev54622696d1313cfcb012120083d917f65558f0906f73ab...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch main cdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9...\n", - "1 Branch etl 2d11823828ee539d7609e1a88083ada6f37d39362a4e3a...\n", - "2 Branch dev 54622696d1313cfcb012120083d917f65558f0906f73ab..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"LIST REFERENCES\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "47\n" - ] - } - ], - "source": [ - "spark.sql(\"USE REFERENCE main\").toPandas()\n", - "table_count = spark.sql(\"select count(*) from allstar_games_stats\").toPandas().values[0][0]\n", - "csv_count = spark.sql(\"select count(*) from allstar_table\").toPandas().values[0][0]\n", - "assert table_count == csv_count\n", - "print(table_count)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create `experiment` branch\n", - "--------------------------------\n", - "As a data analyst we might want to carry out some experiments with some data, without affecting `main` in any way.\n", - "As in the previous examples, we can just get started by creating an `experiment` branch off of `main`\n", - "and carry out our experiment, which could consist of the following steps:\n", - "- drop `totals_stats` table\n", - "- add data to `salaries` table\n", - "- compare `experiment` and `main` tables" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
refTypenamehash
0Branchexperimentcdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9...
\n", - "
" - ], - "text/plain": [ - " refType name hash\n", - "0 Branch experiment cdf244cd4af77968becc0ebd0439efea6c6e6df8923ed9..." - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"CREATE BRANCH experiment FROM main\").toPandas()\n", - "spark.sql(\"USE REFERENCE experiment\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Drop the `totals_stats` table on the `experiment` branch\n", - "spark.sql(\"DROP TABLE totals_stats\").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# add some salaries for Dirk Nowitzki\n", - "spark.sql(\n", - " \"\"\"INSERT INTO salaries VALUES\n", - " (\"2015-16\", \"Dallas Mavericks\", \"$8333333\", \"Dirk Nowitzki\"),\n", - " (\"2016-17\", \"Dallas Mavericks\", \"$25000000\", \"Dirk Nowitzki\"),\n", - " (\"2017-28\", \"Dallas Mavericks\", \"$5000000\", \"Dirk Nowitzki\"),\n", - " (\"2018-19\", \"Dallas Mavericks\", \"$5000000\", \"Dirk Nowitzki\")\n", - " \"\"\"\n", - ").toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.allstar_stats._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list --ref experiment" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false - }, - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DELTA_LAKE_TABLE:\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.salaries._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.totals_stats._delta_log\n", - "\thome.jovyan.notebooks.spark_warehouse.delta.allstar_stats._delta_log\n", - "\n" - ] - } - ], - "source": [ - "!nessie content list" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let's take a look at the contents of the `salaries` table on the `experiment` branch.\n", - "Notice the use of the `nessie` catalog and the use of `@experiment` to view data on the `experiment` branch" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
count(1)
058
\n", - "
" - ], - "text/plain": [ - " count(1)\n", - "0 58" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"select count(*) from salaries\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "and compare to the contents of the `salaries` table on the `main` branch. Notice that we didn't have to specify `@branchName` as it defaulted\n", - "to the `main` branch" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
count(1)
054
\n", - "
" - ], - "text/plain": [ - " count(1)\n", - "0 54" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"USE REFERENCE main\").toPandas()\n", - "spark.sql(\"select count(*) from salaries\").toPandas()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "And finally lets clean up after ourselves" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [ - { - "data": { - "text/plain": [ - "DataFrame[status: string]" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "spark.sql(\"DROP BRANCH dev\")\n", - "spark.sql(\"DROP BRANCH etl\")\n", - "spark.sql(\"DROP BRANCH experiment\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.7.12" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} \ No newline at end of file diff --git a/notebooks/tests/test_nessie_delta_demo_nba.py b/notebooks/tests/test_nessie_delta_demo_nba.py deleted file mode 100644 index e870776e..00000000 --- a/notebooks/tests/test_nessie_delta_demo_nba.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2020 Dremio -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""Tests the Nessie + Iceberg + Spark Jupyter Notebook with the NBA dataset.""" -from typing import Iterator - -import pytest -from assertpy import assert_that -from assertpy.assertpy import AssertionBuilder -from testbook import testbook -from testbook.client import TestbookNotebookClient -from utils import fetch_spark - -from . import _find_notebook -from . import _remove_folders -from . import start_nessie - -num_salaries_on_experiment = """count(1) -0 58""" - -num_salaries_on_main = """count(1) -0 54""" - - -@pytest.fixture(scope="module") -def notebook() -> Iterator[TestbookNotebookClient]: - """Pytest fixture to run a notebook.""" - path_to_notebook = _find_notebook("nessie-delta-demo-nba.ipynb") - fetch_spark() - - with start_nessie(): - with testbook(path_to_notebook, timeout=300) as tb: - tb.execute() - yield tb - # Clean all the folders that being created by this test - _remove_folders(["spark-warehouse", "spark_warehouse"]) - - -def _assert_that_notebook( - text: str, notebook: TestbookNotebookClient, count: int = 0 -) -> AssertionBuilder: - for seen, value in enumerate( - n for n, i in enumerate(notebook.cells) if text in i["source"] - ): - if seen == count: - return assert_that(notebook.cell_output_text(value)) - raise Exception(f"Unable to find cell with text: {text}") - - -def test_notebook_output(notebook: TestbookNotebookClient) -> None: - """Runs through the entire notebook and checks the output. - - :param notebook: The notebook to test - :return: - """ - assertion = lambda x: _assert_that_notebook(x, notebook) # NOQA - assertion_counted = lambda x, y: _assert_that_notebook(x, notebook, y) # NOQA - - assertion("findspark.init").contains("Spark Running") - - assertion("CREATE BRANCH dev").contains("Branch").contains("dev") - - assertion("INSERT INTO totals_stats SELECT * FROM stats_table").is_equal_to( - "Empty DataFrame\nColumns: []\nIndex: []" - ) - - assertion_counted("LIST REFERENCES", 1).contains("main").contains("dev").contains( - "Branch" - ) - - assertion_counted( - 'spark.sql("select count(*) from salaries").toPandas()', 2 - ).is_equal_to(num_salaries_on_experiment) - - assertion_counted( - 'spark.sql("select count(*) from salaries").toPandas()', 3 - ).is_equal_to(num_salaries_on_main) - - -def test_dependency_setup(notebook: TestbookNotebookClient) -> None: - """Verifies that dependencies were correctly set up. - - :param notebook: The notebook to test - :return: - """ - spark = notebook.ref("spark") - assert_that(spark).is_not_none()