Files
2026-02-17 10:59:51 -08:00

190 lines
4.1 KiB
Markdown

### Spark Configuration (Best Practices)
```python
# Enable Fabric optimizations
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
```
### Reading Data
```python
# Read CSV file
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("Files/bronze/data.csv")
# Read JSON file
df = spark.read.format("json").load("Files/bronze/data.json")
# Read Parquet file
df = spark.read.format("parquet").load("Files/bronze/data.parquet")
# Read Delta table
df = spark.read.table("my_delta_table")
# Read from SQL endpoint
df = spark.sql("SELECT * FROM lakehouse.my_table")
```
### Writing Delta Tables
```python
# Write DataFrame as managed Delta table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("silver_customers")
# Write with partitioning
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.saveAsTable("silver_transactions")
# Append to existing table
df.write.format("delta") \
.mode("append") \
.saveAsTable("silver_events")
```
### Delta Table Operations (CRUD)
```python
# UPDATE
spark.sql("""
UPDATE silver_customers
SET status = 'active'
WHERE last_login > '2024-01-01' -- Example date, adjust as needed
""")
# DELETE
spark.sql("""
DELETE FROM silver_customers
WHERE is_deleted = true
""")
# MERGE (Upsert)
spark.sql("""
MERGE INTO silver_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
```
### Schema Definition
```python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DecimalType
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("amount", DecimalType(18, 2), True),
StructField("created_at", TimestampType(), True)
])
df = spark.read.format("csv") \
.schema(schema) \
.option("header", "true") \
.load("Files/bronze/customers.csv")
```
### SQL Magic in Notebooks
```sql
%%sql
-- Query Delta table directly
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM gold_orders
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 10
```
### V-Order Optimization
```python
# Enable V-Order for read optimization
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
```
### Table Optimization
```sql
%%sql
-- Optimize table (compact small files)
OPTIMIZE silver_transactions
-- Optimize with Z-ordering on query columns
OPTIMIZE silver_transactions ZORDER BY (customer_id, transaction_date)
-- Vacuum old files (default 7 days retention)
VACUUM silver_transactions
-- Vacuum with custom retention
VACUUM silver_transactions RETAIN 168 HOURS
```
### Incremental Load Pattern
```python
from pyspark.sql.functions import col
# Get last processed watermark
last_watermark = spark.sql("""
SELECT MAX(processed_timestamp) as watermark
FROM silver_orders
""").collect()[0]["watermark"]
# Load only new records
new_records = spark.read.format("delta") \
.table("bronze_orders") \
.filter(col("created_at") > last_watermark)
# Merge new records
new_records.createOrReplaceTempView("staging_orders")
spark.sql("""
MERGE INTO silver_orders AS target
USING staging_orders AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
```
### SCD Type 2 Pattern
```python
from pyspark.sql.functions import current_timestamp, lit
# Close existing records
spark.sql("""
UPDATE dim_customer
SET is_current = false, end_date = current_timestamp()
WHERE customer_id IN (SELECT customer_id FROM staging_customer)
AND is_current = true
""")
# Insert new versions
spark.sql("""
INSERT INTO dim_customer
SELECT
customer_id,
name,
email,
address,
current_timestamp() as start_date,
null as end_date,
true as is_current
FROM staging_customer
""")
```