Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several warehouses #9811

Open
ErikJansenIRefact opened this issue Oct 24, 2024 · 1 comment
Open

Several warehouses #9811

ErikJansenIRefact opened this issue Oct 24, 2024 · 1 comment

Comments

@ErikJansenIRefact
Copy link

Issue description

I started a docker nessie server with the following environment vars:
- QUARKUS_PROFILE=prod
- QUARKUS_HTTP_PORT=19120
- QUARKUS_LOG_CONSOLE_FORMAT=%d{yyyy-MM-dd HH:mm:ss} %-5p [%c{1.}] (%t) %s%e%n
- QUARKUS_LOG_LEVEL=INFO
- QUARKUS_DATASOURCE_DB_KIND=rocksdb
- QUARKUS_DATASOURCE_JDBC_URL=jdbc:rocksdb:file:///nessie/data
- QUARKUS_DATASOURCE_USERNAME=nessie
- QUARKUS_DATASOURCE_PASSWORD=nessie
- nessie.catalog.default-warehouse=warehouse
- nessie.catalog.warehouses.warehouse.location=s3://warehouse
- nessie.catalog.warehouses.humanresource.location=s3://humanresource

In Spark we now want to create a table on the warehouse: warehouse and on the warehouse: humanresource. The namespace and table name are the same for both warehouses: sales.sales_data_raw.

In the first spark session the warehouse is set to: s3://warehouse. The namespace and table is created on the expected location and also the metadata files are written.

In a second spark session the warehouse is set to: s3://humanresource. The same namespace and table is created and written. But instead of writing it to s3://humanresource the data is written to s3://warehouse. What am I doing wrong?

This is the spark code:
`
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import os

DEFINE SENSITIVE VARIABLES

CATALOG_URI = "http://nessie:19120/api/v1" # Nessie Server URI
WAREHOUSE = "s3://humanresource" # Minio Address to Write to
STORAGE_URI = "http://172.18.0.3:9000" # Minio IP address from docker inspect

Configure Spark with necessary packages and Iceberg/Nessie settings

conf = (
pyspark.SparkConf()
.setAppName('sales_data_app')
# Include necessary packages
.set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,software.amazon.awssdk:bundle:2.24.8,software.amazon.awssdk:url-connection-client:2.24.8')
# Enable Iceberg and Nessie extensions
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
# Configure Nessie catalog
.set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.nessie.uri', CATALOG_URI)
.set('spark.sql.catalog.nessie.ref', 'main')
.set('spark.sql.catalog.nessie.authentication.type', 'NONE')
.set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
# Set Minio as the S3 endpoint for Iceberg storage
.set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
.set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
.set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

Start Spark session

spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Session Started")

Define a schema for the sales data

schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("product", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("order_date", StringType(), True)
])

Create a DataFrame with messy sales data (including duplicates and errors)

sales_data = [
(1, 101, "Laptop", 1, 1000.00, "2023-08-01"),
(2, 102, "Mouse", 2, 25.50, "2023-08-01"),
(3, 103, "Keyboard", 1, 45.00, "2023-08-01"),
(1, 101, "Laptop", 1, 1000.00, "2023-08-01"), # Duplicate
(4, 104, "Monitor", None, 200.00, "2023-08-02"), # Missing quantity
(5, None, "Mouse", 1, 25.50, "2023-08-02") # Missing customer_id
]

Convert the data into a DataFrame

sales_df = spark.createDataFrame(sales_data, schema)

Create the "sales" namespace

spark.sql("CREATE NAMESPACE if not exists nessie.sales;").show()

Write the DataFrame to an Iceberg table in the Nessie catalog

sales_df.writeTo("nessie.sales.sales_data_raw").createOrReplace()

Verify by reading from the Iceberg table

spark.read.table("nessie.sales.sales_data_raw").show()

Stop the Spark session

spark.stop()
`

@adutra
Copy link
Contributor

adutra commented Dec 3, 2024

Hi, a few things I noticed in your setup:

  • You are using the java-based org.apache.iceberg.nessie.NessieCatalog – with this catalog impl, all writes happen on the client (engine) side, using the warehouse configured with the warehouse property. Nessie's role in this case is to only store the metadata location pointers, it doesn't access the object storage layer at all.
  • For this reason, the following properties are ineffective when using NessieCatalog, they are only effective when using Iceberg's RESTCatalog instead:
    • nessie.catalog.default-warehouse=warehouse
    • nessie.catalog.warehouses.warehouse.location=s3://warehouse
    • nessie.catalog.warehouses.humanresource.location=s3://humanresource

In a second spark session the warehouse is set to: s3://humanresource. The same namespace and table is created and written. But instead of writing it to s3://humanresource the data is written to s3://warehouse. What am I doing wrong?

Hmm this is a mystery to me. As I said, when using NessieCatalog the only location that matters is the one you provide with the warehouse property. Are you maybe using RESTCatalog instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants