mirror of
https://github.com/github/awesome-copilot.git
synced 2026-02-20 02:15:12 +00:00
190 lines
4.1 KiB
Markdown
190 lines
4.1 KiB
Markdown
### Spark Configuration (Best Practices)
|
|
|
|
```python
|
|
# Enable Fabric optimizations
|
|
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
|
|
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
|
|
```
|
|
|
|
### Reading Data
|
|
|
|
```python
|
|
# Read CSV file
|
|
df = spark.read.format("csv") \
|
|
.option("header", "true") \
|
|
.option("inferSchema", "true") \
|
|
.load("Files/bronze/data.csv")
|
|
|
|
# Read JSON file
|
|
df = spark.read.format("json").load("Files/bronze/data.json")
|
|
|
|
# Read Parquet file
|
|
df = spark.read.format("parquet").load("Files/bronze/data.parquet")
|
|
|
|
# Read Delta table
|
|
df = spark.read.table("my_delta_table")
|
|
|
|
# Read from SQL endpoint
|
|
df = spark.sql("SELECT * FROM lakehouse.my_table")
|
|
```
|
|
|
|
### Writing Delta Tables
|
|
|
|
```python
|
|
# Write DataFrame as managed Delta table
|
|
df.write.format("delta") \
|
|
.mode("overwrite") \
|
|
.saveAsTable("silver_customers")
|
|
|
|
# Write with partitioning
|
|
df.write.format("delta") \
|
|
.mode("overwrite") \
|
|
.partitionBy("year", "month") \
|
|
.saveAsTable("silver_transactions")
|
|
|
|
# Append to existing table
|
|
df.write.format("delta") \
|
|
.mode("append") \
|
|
.saveAsTable("silver_events")
|
|
```
|
|
|
|
### Delta Table Operations (CRUD)
|
|
|
|
```python
|
|
# UPDATE
|
|
spark.sql("""
|
|
UPDATE silver_customers
|
|
SET status = 'active'
|
|
WHERE last_login > '2024-01-01' -- Example date, adjust as needed
|
|
""")
|
|
|
|
# DELETE
|
|
spark.sql("""
|
|
DELETE FROM silver_customers
|
|
WHERE is_deleted = true
|
|
""")
|
|
|
|
# MERGE (Upsert)
|
|
spark.sql("""
|
|
MERGE INTO silver_customers AS target
|
|
USING staging_customers AS source
|
|
ON target.customer_id = source.customer_id
|
|
WHEN MATCHED THEN UPDATE SET *
|
|
WHEN NOT MATCHED THEN INSERT *
|
|
""")
|
|
```
|
|
|
|
### Schema Definition
|
|
|
|
```python
|
|
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DecimalType
|
|
|
|
schema = StructType([
|
|
StructField("id", IntegerType(), False),
|
|
StructField("name", StringType(), True),
|
|
StructField("email", StringType(), True),
|
|
StructField("amount", DecimalType(18, 2), True),
|
|
StructField("created_at", TimestampType(), True)
|
|
])
|
|
|
|
df = spark.read.format("csv") \
|
|
.schema(schema) \
|
|
.option("header", "true") \
|
|
.load("Files/bronze/customers.csv")
|
|
```
|
|
|
|
### SQL Magic in Notebooks
|
|
|
|
```sql
|
|
%%sql
|
|
-- Query Delta table directly
|
|
SELECT
|
|
customer_id,
|
|
COUNT(*) as order_count,
|
|
SUM(amount) as total_amount
|
|
FROM gold_orders
|
|
GROUP BY customer_id
|
|
ORDER BY total_amount DESC
|
|
LIMIT 10
|
|
```
|
|
|
|
### V-Order Optimization
|
|
|
|
```python
|
|
# Enable V-Order for read optimization
|
|
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
|
|
```
|
|
|
|
### Table Optimization
|
|
|
|
```sql
|
|
%%sql
|
|
-- Optimize table (compact small files)
|
|
OPTIMIZE silver_transactions
|
|
|
|
-- Optimize with Z-ordering on query columns
|
|
OPTIMIZE silver_transactions ZORDER BY (customer_id, transaction_date)
|
|
|
|
-- Vacuum old files (default 7 days retention)
|
|
VACUUM silver_transactions
|
|
|
|
-- Vacuum with custom retention
|
|
VACUUM silver_transactions RETAIN 168 HOURS
|
|
|
|
```
|
|
|
|
### Incremental Load Pattern
|
|
|
|
```python
|
|
from pyspark.sql.functions import col
|
|
|
|
# Get last processed watermark
|
|
last_watermark = spark.sql("""
|
|
SELECT MAX(processed_timestamp) as watermark
|
|
FROM silver_orders
|
|
""").collect()[0]["watermark"]
|
|
|
|
# Load only new records
|
|
new_records = spark.read.format("delta") \
|
|
.table("bronze_orders") \
|
|
.filter(col("created_at") > last_watermark)
|
|
|
|
# Merge new records
|
|
new_records.createOrReplaceTempView("staging_orders")
|
|
spark.sql("""
|
|
MERGE INTO silver_orders AS target
|
|
USING staging_orders AS source
|
|
ON target.order_id = source.order_id
|
|
WHEN MATCHED THEN UPDATE SET *
|
|
WHEN NOT MATCHED THEN INSERT *
|
|
""")
|
|
```
|
|
|
|
### SCD Type 2 Pattern
|
|
|
|
```python
|
|
from pyspark.sql.functions import current_timestamp, lit
|
|
|
|
# Close existing records
|
|
spark.sql("""
|
|
UPDATE dim_customer
|
|
SET is_current = false, end_date = current_timestamp()
|
|
WHERE customer_id IN (SELECT customer_id FROM staging_customer)
|
|
AND is_current = true
|
|
""")
|
|
|
|
# Insert new versions
|
|
spark.sql("""
|
|
INSERT INTO dim_customer
|
|
SELECT
|
|
customer_id,
|
|
name,
|
|
email,
|
|
address,
|
|
current_timestamp() as start_date,
|
|
null as end_date,
|
|
true as is_current
|
|
FROM staging_customer
|
|
""")
|
|
```
|