Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notebook examples #89

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Operations are applied concurrently across multiple tables
* [VACUUM all tables](docs/Vacuum.md) ([example notebook](examples/vacuum_multiple_tables.py))
* OPTIMIZE with z-order on tables having specified columns
* Detect tables having too many small files ([example notebook](examples/detect_small_files.py))
* Visualise quantity of data written per table per period
* Visualise quantity of data written per table per period ([example notebook](examples/table_freshness.py))
* Check delta protocol versions ([example notebook](examples/check_delta_protocol_version.py))
* **Governance**
* PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py))
* Text Analysis with MosaicML and Databricks MLflow ([example notebook](examples/text_analysis_mosaicml_mlflow.py))
Expand Down
90 changes: 90 additions & 0 deletions examples/check_delta_protocol_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # Check delta protocol version
# MAGIC
# MAGIC This notebook will check the delta read and write protocol versions of multiple tables.
# MAGIC
# MAGIC Feature compatibility between delta lake versions is managed through [read protocol](https://docs.delta.io/latest/versioning.html#read-protocol) and [write protocol](https://docs.delta.io/latest/versioning.html#write-protocol).
# MAGIC
# MAGIC Check out the [feature by protocol version table](https://docs.delta.io/latest/versioning.html#features-by-protocol-version) for more details.

# COMMAND ----------

# %pip install dbl-discoverx

# COMMAND ----------

dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*")
from_tables = dbutils.widgets.get("from_tables")


# COMMAND ----------

from discoverx import DX

dx = DX()


# COMMAND ----------

dx.from_tables(from_tables)\
.with_sql("SHOW TBLPROPERTIES {full_table_name}")\
.apply()\
.filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"')\
.display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Show delta feature compatibility

# COMMAND ----------

from pyspark.sql.functions import col, expr

result = (dx.from_tables(from_tables)\
.with_sql("SHOW TBLPROPERTIES {full_table_name}")
.apply()
.filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"')
.withColumn("value", col("value").cast("int"))
.groupBy("table_catalog", "table_schema", "table_name")
.pivot("key", ["delta.minWriterVersion", "delta.minReaderVersion"])
.min("value")
.withColumnRenamed("delta.minReaderVersion", "minReaderVersion")
.withColumnRenamed("delta.minWriterVersion", "minWriterVersion")
.withColumn("supports_basic_functionality", expr("minWriterVersion >= 2 AND minReaderVersion >= 1"))
.withColumn("supports_check_constraint", expr("minWriterVersion >= 3 AND minReaderVersion >= 1"))
.withColumn("supports_change_data_feed", expr("minWriterVersion >= 4 AND minReaderVersion >= 1"))
.withColumn("supports_generated_columns", expr("minWriterVersion >= 4 AND minReaderVersion >= 1"))
.withColumn("supports_column_mapping", expr("minWriterVersion >= 5 AND minReaderVersion >= 2"))
.withColumn("supports_table_features_read", expr("minWriterVersion >= 7 AND minReaderVersion >= 1"))
.withColumn("supports_table_features_write", expr("minWriterVersion >= 7 AND minReaderVersion >= 3"))
.withColumn("supports_deletion_vectors", expr("minWriterVersion >= 7 AND minReaderVersion >= 3"))
.withColumn("supports_timestamp_without_timezone", expr("minWriterVersion >= 7 AND minReaderVersion >= 3"))
.withColumn("supports_iceberg_compatibilty_v1", expr("minWriterVersion >= 7 AND minReaderVersion >= 2"))
.withColumn("supports_v2_checkpoints", expr("minWriterVersion >= 7 AND minReaderVersion >= 3"))
)

result.display()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Update protocol version
# MAGIC
# MAGIC You can update the table protocol read and write versions by uncommenting the following snippet.
# MAGIC
# MAGIC !!! BE CAREFUL !!!
# MAGIC
# MAGIC Upgrading a reader or writer version might impact older DBR version's ability to read or write the tables. Check [this page](https://docs.databricks.com/en/delta/feature-compatibility.html#what-delta-lake-features-require-databricks-runtime-upgrades) for more details.

# COMMAND ----------

# (dx.from_tables(from_tables)
# .with_sql("ALTER TABLE {full_table_name} SET TBLPROPERTIES('delta.minWriterVersion' = '5', 'delta.minReaderVersion' = '2')")
# .apply()
# )

# COMMAND ----------


95 changes: 95 additions & 0 deletions examples/slide_deck_examples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Databricks notebook source
edurdevic marked this conversation as resolved.
Show resolved Hide resolved
# MAGIC %pip install dbl-discoverx
edurdevic marked this conversation as resolved.
Show resolved Hide resolved

# COMMAND ----------

dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*")
from_tables = dbutils.widgets.get("from_tables")

# COMMAND ----------

from discoverx import DX

dx = DX()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Which are the biggest 10 tables in the "sample_data_discoverx" catalog?
# MAGIC
# MAGIC

# COMMAND ----------

from pyspark.sql.functions import col

(dx
.from_tables("sample_data_discoverx.*.*")
.with_sql("DESCRIBE DETAIL {full_table_name}")
.apply()
.orderBy(col("sizeInBytes").desc())
.display()
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Which tables have the most daily transactions?

# COMMAND ----------

from pyspark.sql.functions import window

(dx
.from_tables("sample_data_discoverx.*.*")
.with_sql("DESCRIBE HISTORY {full_table_name}")
.apply()
.groupBy("table_catalog", "table_schema", "table_name", window("timestamp", "1 day"))
.count()
.display()
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Which tables have too many small files?

# COMMAND ----------

from pyspark.sql.functions import col, lit

(dx
.from_tables("sample_data_discoverx.*.*")
.with_sql("DESCRIBE DETAIL {full_table_name}")
.apply()
.withColumn("average_file_size", col("sizeInBytes") / col("numFiles"))
.withColumn("has_many_small_files",
(col("average_file_size") < 10000000) & (col("numFiles") > 100))
.orderBy("average_file_size")
.display()
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Which tables contain email addresses?

# COMMAND ----------

result = (dx
.from_tables("sample_data_discoverx.*.*")
.scan() # Returns a Discovery object
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Which tables contain the email address “[email protected]”?

# COMMAND ----------

result.search("[email protected]").display()

# COMMAND ----------


103 changes: 103 additions & 0 deletions examples/table_freshness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Databricks notebook source
# MAGIC %pip install dbl-discoverx

# COMMAND ----------

dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*")
from_tables = dbutils.widgets.get("from_tables")
dbutils.widgets.text("time_span", "1 day")
time_span = dbutils.widgets.get("time_span")

# COMMAND ----------

from discoverx import DX

dx = DX()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Number of delta table versions per time period

# COMMAND ----------

from pyspark.sql.functions import window, count

(dx
.from_tables(from_tables)
.with_sql("DESCRIBE HISTORY {full_table_name}")
.apply()
.groupBy("table_catalog", "table_schema", "table_name", window("timestamp", time_span))
.agg(count("*").alias("delta_versions_count"))
.display()
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Number of processed rows

# COMMAND ----------

sql_template = f"""
WITH metrics AS (
SELECT timestamp, operation, explode(operationMetrics) AS (metric, value)
FROM (
DESCRIBE HISTORY {{full_table_name}}
)
),

metrics_window AS (
SELECT window(timestamp, '{time_span}') AS time_window, metric, sum(value) as total_rows
FROM metrics
WHERE metric IN (
-- Written
"numCopiedRows",
"numUpdatedRows",
"numOutputRows",
-- Deleted
"numDeletedRows",
"numTargetRowsDeleted"
)
GROUP BY 1, 2
),

metrics_pivot AS (
SELECT *
FROM metrics_window
PIVOT (sum(total_rows) as total_rows
FOR (metric) IN (
-- Written
"numCopiedRows",
"numUpdatedRows",
"numOutputRows",

-- Deleted
"numDeletedRows",
"numTargetRowsDeleted"
)
)
)

SELECT
time_window,
-- Written rows include copied, updated and added rows
(COALESCE(numCopiedRows, 0) + COALESCE(numUpdatedRows, 0) + COALESCE(numOutputRows, 0)) AS totNumWrittenRows,
-- Deleted rows from both delete and merge operations
(COALESCE(numDeletedRows, 0) + COALESCE(numTargetRowsDeleted, 0)) AS totNumDeletedRows
FROM metrics_pivot
"""

processed_rows = (dx
.from_tables(from_tables)
.with_sql(sql_template)
.apply()
).toPandas()

# COMMAND ----------

display(processed_rows)

# COMMAND ----------


Loading