mirror of
https://github.com/github/awesome-copilot.git
synced 2026-06-15 20:34:59 +00:00
chore: publish from staged
This commit is contained in:
@@ -176,6 +176,7 @@ See [CONTRIBUTING.md](../CONTRIBUTING.md#adding-instructions) for guidelines on
|
|||||||
| [Ruby on Rails](../instructions/ruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md) | Ruby on Rails coding conventions and guidelines |
|
| [Ruby on Rails](../instructions/ruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md) | Ruby on Rails coding conventions and guidelines |
|
||||||
| [Rust Coding Conventions and Best Practices](../instructions/rust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md) | Rust programming language coding conventions and best practices |
|
| [Rust Coding Conventions and Best Practices](../instructions/rust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md) | Rust programming language coding conventions and best practices |
|
||||||
| [Rust MCP Server Development Best Practices](../instructions/rust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md) | Best practices for building Model Context Protocol servers in Rust using the official rmcp SDK with async/await patterns |
|
| [Rust MCP Server Development Best Practices](../instructions/rust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md) | Best practices for building Model Context Protocol servers in Rust using the official rmcp SDK with async/await patterns |
|
||||||
|
| [Scala + Apache Spark Best Practices](../instructions/scala-spark.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md) | Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns. |
|
||||||
| [Scala Best Practices](../instructions/scala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md) | Scala 2.12/2.13 programming language coding conventions and best practices following Databricks style guide for functional programming, type safety, and production code quality. |
|
| [Scala Best Practices](../instructions/scala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md) | Scala 2.12/2.13 programming language coding conventions and best practices following Databricks style guide for functional programming, type safety, and production code quality. |
|
||||||
| [Security Standards](../instructions/security-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md) | Comprehensive secure coding standards based on OWASP Top 10 2025, with 55+ anti-patterns, detection regex, framework-specific fixes for modern web and backend frameworks, and AI/LLM security guidance. |
|
| [Security Standards](../instructions/security-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md) | Comprehensive secure coding standards based on OWASP Top 10 2025, with 55+ anti-patterns, detection regex, framework-specific fixes for modern web and backend frameworks, and AI/LLM security guidance. |
|
||||||
| [Self-explanatory Code Commenting Instructions](../instructions/self-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md) | Guidelines for GitHub Copilot to write comments to achieve self-explanatory code with less comments. Examples are in JavaScript but it should work on any language that has comments. |
|
| [Self-explanatory Code Commenting Instructions](../instructions/self-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md) | Guidelines for GitHub Copilot to write comments to achieve self-explanatory code with less comments. Examples are in JavaScript but it should work on any language that has comments. |
|
||||||
|
|||||||
@@ -0,0 +1,531 @@
|
|||||||
|
---
|
||||||
|
description: 'Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns.'
|
||||||
|
applyTo: '**/*.scala, **/build.sbt, **/build.sc'
|
||||||
|
---
|
||||||
|
|
||||||
|
# Scala + Apache Spark Best Practices
|
||||||
|
|
||||||
|
Guidelines for writing efficient, maintainable, and production-ready Apache Spark applications in Scala.
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
### SBT
|
||||||
|
|
||||||
|
```scala
|
||||||
|
val sparkVersion = "3.5.1"
|
||||||
|
|
||||||
|
libraryDependencies ++= Seq(
|
||||||
|
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
|
||||||
|
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
|
||||||
|
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
|
||||||
|
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Maven
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<properties>
|
||||||
|
<spark.version>3.5.1</spark.version>
|
||||||
|
<scala.binary.version>2.13</scala.binary.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||||
|
<version>${spark.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||||
|
<version>${spark.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
|
||||||
|
<version>${spark.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
|
||||||
|
<version>${spark.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
```
|
||||||
|
|
||||||
|
Mark Spark dependencies as `"provided"` since the cluster supplies them at runtime. Only bundle application-specific libraries in the fat JAR.
|
||||||
|
|
||||||
|
## SparkSession Setup
|
||||||
|
|
||||||
|
Always use `SparkSession` as the single entry point:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
val spark: SparkSession = SparkSession.builder()
|
||||||
|
.appName("MyApplication")
|
||||||
|
.config("spark.sql.shuffle.partitions", "200")
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
```
|
||||||
|
|
||||||
|
- Do **not** create multiple `SparkSession` instances in the same JVM.
|
||||||
|
- Avoid hardcoding `master` in application code; set it at submit time via `--master`.
|
||||||
|
|
||||||
|
## DataFrames vs Datasets vs RDDs
|
||||||
|
|
||||||
|
Prefer the **DataFrame API** (untyped `Dataset[Row]`) for most workloads. Use **Datasets** (typed) when compile-time type safety justifies the serialization overhead. Avoid raw **RDDs** unless you need low-level control.
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.{DataFrame, Dataset}
|
||||||
|
|
||||||
|
// Preferred — DataFrame API
|
||||||
|
val df: DataFrame = spark.read.parquet("data/events")
|
||||||
|
val result = df
|
||||||
|
.filter($"status" === "active")
|
||||||
|
.groupBy($"region")
|
||||||
|
.agg(count("*").as("total"))
|
||||||
|
|
||||||
|
// Typed Dataset — use when schema safety matters
|
||||||
|
case class Event(id: Long, status: String, region: String)
|
||||||
|
val ds: Dataset[Event] = df.as[Event]
|
||||||
|
val active = ds.filter(_.status == "active")
|
||||||
|
```
|
||||||
|
|
||||||
|
## Schema Management
|
||||||
|
|
||||||
|
Always define schemas explicitly when reading semi-structured data instead of relying on schema inference:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
val schema = StructType(Seq(
|
||||||
|
StructField("id", LongType, nullable = false),
|
||||||
|
StructField("name", StringType, nullable = true),
|
||||||
|
StructField("timestamp", TimestampType, nullable = false),
|
||||||
|
StructField("amount", DecimalType(18, 2), nullable = true),
|
||||||
|
StructField("tags", ArrayType(StringType), nullable = true)
|
||||||
|
))
|
||||||
|
|
||||||
|
val df = spark.read
|
||||||
|
.schema(schema)
|
||||||
|
.json("data/events/*.json")
|
||||||
|
```
|
||||||
|
|
||||||
|
- Schema inference (`inferSchema=true`) reads the entire data source and is expensive for large files.
|
||||||
|
- For Parquet and Delta, the schema is embedded — explicit definition is unnecessary.
|
||||||
|
|
||||||
|
## Column Expressions
|
||||||
|
|
||||||
|
Prefer `col()` or `$""` over string column names in transformations for early error detection:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
|
// Good — type-checked column references
|
||||||
|
df.select(col("name"), $"amount" * 1.1 as "adjusted_amount")
|
||||||
|
|
||||||
|
// Avoid — string-only references delay errors to runtime
|
||||||
|
df.select("name", "amount")
|
||||||
|
```
|
||||||
|
|
||||||
|
## Joins
|
||||||
|
|
||||||
|
### Broadcast Joins
|
||||||
|
|
||||||
|
Broadcast the smaller side of a join when it fits in executor memory (typically < 100 MB):
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.functions.broadcast
|
||||||
|
|
||||||
|
val enriched = largeDF.join(
|
||||||
|
broadcast(smallLookupDF),
|
||||||
|
Seq("key"),
|
||||||
|
"left"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Avoiding Cartesian Products
|
||||||
|
|
||||||
|
Never use cross joins unless intentional. Enable the safeguard:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
spark.conf.set("spark.sql.crossJoin.enabled", "false")
|
||||||
|
```
|
||||||
|
|
||||||
|
### Skew Handling
|
||||||
|
|
||||||
|
For joins on skewed keys, salt the key to distribute load:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
|
val saltBuckets = 10
|
||||||
|
val saltedLeft = leftDF.withColumn("salt", (rand() * saltBuckets).cast("int"))
|
||||||
|
val saltedRight = rightDF
|
||||||
|
.crossJoin((0 until saltBuckets).toDF("salt"))
|
||||||
|
|
||||||
|
val result = saltedLeft
|
||||||
|
.join(saltedRight, Seq("join_key", "salt"))
|
||||||
|
.drop("salt")
|
||||||
|
```
|
||||||
|
|
||||||
|
The tradeoff is that the right side grows by 10×, so this only works when the right side is reasonably small or the skew is severe enough to justify it. For Spark 3.x+, AQE's built-in skew join handling (`spark.sql.adaptive.skewJoin.enabled = true`) can do this automatically without manual salting.
|
||||||
|
|
||||||
|
## Partitioning and Bucketing
|
||||||
|
|
||||||
|
### Write Partitioning
|
||||||
|
|
||||||
|
Partition output by high-cardinality filter columns (e.g., date):
|
||||||
|
|
||||||
|
```scala
|
||||||
|
df.write
|
||||||
|
.partitionBy("year", "month")
|
||||||
|
.mode("overwrite")
|
||||||
|
.parquet("output/events")
|
||||||
|
```
|
||||||
|
|
||||||
|
- Avoid partitioning on high-cardinality columns (e.g., user ID) which creates millions of small files.
|
||||||
|
|
||||||
|
### Shuffle Partitions
|
||||||
|
|
||||||
|
Tune `spark.sql.shuffle.partitions` based on data volume:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
// Default is 200; adjust based on data size
|
||||||
|
// Rule of thumb: target 128 MB per partition
|
||||||
|
spark.conf.set("spark.sql.shuffle.partitions", "400")
|
||||||
|
```
|
||||||
|
|
||||||
|
### Repartition vs Coalesce
|
||||||
|
|
||||||
|
```scala
|
||||||
|
// Repartition — full shuffle, use to increase or evenly distribute partitions
|
||||||
|
df.repartition(100, $"key")
|
||||||
|
|
||||||
|
// Coalesce — no shuffle, use only to reduce partition count
|
||||||
|
df.coalesce(10)
|
||||||
|
```
|
||||||
|
|
||||||
|
Never use `coalesce(1)` on large datasets — it forces all data through a single task.
|
||||||
|
|
||||||
|
## Caching and Persistence
|
||||||
|
|
||||||
|
Cache only when a DataFrame is reused multiple times:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.storage.StorageLevel
|
||||||
|
|
||||||
|
val cached = expensiveDF.persist(StorageLevel.MEMORY_AND_DISK)
|
||||||
|
cached.count() // materialize the cache
|
||||||
|
|
||||||
|
// Use cached DF multiple times
|
||||||
|
val summary = cached.groupBy("region").count()
|
||||||
|
val filtered = cached.filter($"amount" > 1000)
|
||||||
|
|
||||||
|
// Always unpersist when done
|
||||||
|
cached.unpersist()
|
||||||
|
```
|
||||||
|
|
||||||
|
- Prefer `MEMORY_AND_DISK` over `MEMORY_ONLY` to avoid recomputation on eviction.
|
||||||
|
- Never cache DataFrames that are only used once.
|
||||||
|
|
||||||
|
## UDFs — Use Sparingly
|
||||||
|
|
||||||
|
Prefer built-in Spark SQL functions over UDFs. UDFs disable Catalyst optimizations and require serialization:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
|
// Good — use built-in functions
|
||||||
|
df.withColumn("upper_name", upper($"name"))
|
||||||
|
.withColumn("name_length", length($"name"))
|
||||||
|
|
||||||
|
// Avoid — UDF for something built-in functions handle
|
||||||
|
val upperUdf = udf((s: String) => s.toUpperCase)
|
||||||
|
df.withColumn("upper_name", upperUdf($"name"))
|
||||||
|
```
|
||||||
|
|
||||||
|
When a UDF is unavoidable, prefer `spark.udf.register` for SparkSQL compatibility, and handle nulls explicitly:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
val parseStatus = udf((raw: String) => {
|
||||||
|
Option(raw).map(_.trim.toLowerCase) match {
|
||||||
|
case Some("active") | Some("enabled") => "ACTIVE"
|
||||||
|
case Some("inactive") | Some("disabled") => "INACTIVE"
|
||||||
|
case _ => "UNKNOWN"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
## Window Functions
|
||||||
|
|
||||||
|
Use window functions for ranking, running totals, and lag/lead calculations:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.expressions.Window
|
||||||
|
|
||||||
|
val windowSpec = Window
|
||||||
|
.partitionBy("department")
|
||||||
|
.orderBy($"salary".desc)
|
||||||
|
|
||||||
|
val ranked = df
|
||||||
|
.withColumn("rank", rank().over(windowSpec))
|
||||||
|
.withColumn("dense_rank", dense_rank().over(windowSpec))
|
||||||
|
.withColumn("row_number", row_number().over(windowSpec))
|
||||||
|
.withColumn("running_total", sum($"salary").over(
|
||||||
|
Window.partitionBy("department").orderBy("hire_date")
|
||||||
|
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
|
||||||
|
))
|
||||||
|
```
|
||||||
|
|
||||||
|
## Error Handling
|
||||||
|
|
||||||
|
### Corrupt Record Handling
|
||||||
|
|
||||||
|
```scala
|
||||||
|
val df = spark.read
|
||||||
|
.option("mode", "PERMISSIVE") // default: keeps corrupt rows
|
||||||
|
.option("columnNameOfCorruptRecord", "_corrupt_record")
|
||||||
|
.schema(schema)
|
||||||
|
.json("data/events")
|
||||||
|
|
||||||
|
val clean = df.filter($"_corrupt_record".isNull).drop("_corrupt_record")
|
||||||
|
val bad = df.filter($"_corrupt_record".isNotNull)
|
||||||
|
bad.write.json("data/quarantine")
|
||||||
|
```
|
||||||
|
|
||||||
|
### Accumulator-Based Error Counting
|
||||||
|
|
||||||
|
```scala
|
||||||
|
val parseErrors = spark.sparkContext.longAccumulator("parseErrors")
|
||||||
|
|
||||||
|
val parsed = df.map { row =>
|
||||||
|
try {
|
||||||
|
parseRow(row)
|
||||||
|
} catch {
|
||||||
|
case _: Exception =>
|
||||||
|
parseErrors.add(1)
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}.filter(_ != null)
|
||||||
|
|
||||||
|
println(s"Parse errors: ${parseErrors.value}")
|
||||||
|
```
|
||||||
|
|
||||||
|
> **Caveat:** Accumulators are only guaranteed accurate inside actions (`count`, `collect`, `write`). If tasks are retried due to failures, accumulators can over-count. For exact error tracking, prefer the quarantine pattern above; use accumulators for operational monitoring only.
|
||||||
|
|
||||||
|
## Streaming (Structured Streaming)
|
||||||
|
|
||||||
|
```scala
|
||||||
|
val stream = spark.readStream
|
||||||
|
.format("kafka")
|
||||||
|
.option("kafka.bootstrap.servers", "broker:9092")
|
||||||
|
.option("subscribe", "events")
|
||||||
|
.option("startingOffsets", "latest")
|
||||||
|
.load()
|
||||||
|
|
||||||
|
val parsed = stream
|
||||||
|
.selectExpr("CAST(value AS STRING) as json")
|
||||||
|
.select(from_json($"json", schema).as("data"))
|
||||||
|
.select("data.*")
|
||||||
|
|
||||||
|
val query = parsed.writeStream
|
||||||
|
.format("delta")
|
||||||
|
.option("checkpointLocation", "/checkpoints/events")
|
||||||
|
.outputMode("append")
|
||||||
|
.trigger(Trigger.ProcessingTime("30 seconds"))
|
||||||
|
.start("output/events")
|
||||||
|
|
||||||
|
query.awaitTermination()
|
||||||
|
```
|
||||||
|
|
||||||
|
- Always set a checkpoint location for fault tolerance.
|
||||||
|
- Use `Trigger.ProcessingTime` or `Trigger.AvailableNow` — avoid `Trigger.Once` in production (use `AvailableNow` instead).
|
||||||
|
|
||||||
|
## Delta Lake Integration
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import io.delta.tables.DeltaTable
|
||||||
|
|
||||||
|
// Upsert / merge
|
||||||
|
val target = DeltaTable.forPath(spark, "data/customers")
|
||||||
|
|
||||||
|
target.as("t")
|
||||||
|
.merge(updatesDF.as("s"), "t.id = s.id")
|
||||||
|
.whenMatched.updateAll()
|
||||||
|
.whenNotMatched.insertAll()
|
||||||
|
.execute()
|
||||||
|
|
||||||
|
// Time travel
|
||||||
|
val yesterday = spark.read
|
||||||
|
.format("delta")
|
||||||
|
.option("timestampAsOf", "2025-01-15")
|
||||||
|
.load("data/customers")
|
||||||
|
|
||||||
|
// Optimize and vacuum
|
||||||
|
target.optimize().executeCompaction()
|
||||||
|
target.vacuum(168) // retain 7 days
|
||||||
|
```
|
||||||
|
|
||||||
|
## Performance Tuning Checklist
|
||||||
|
|
||||||
|
1. **Minimize shuffles** — use `broadcast` joins, pre-partition data, avoid unnecessary `groupBy`.
|
||||||
|
2. **Avoid `collect()` on large DataFrames** — it pulls all data to the driver.
|
||||||
|
3. **Prefer `explain(true)`** to inspect physical plans before running expensive jobs.
|
||||||
|
4. **Enable Adaptive Query Execution (AQE)**:
|
||||||
|
```scala
|
||||||
|
spark.conf.set("spark.sql.adaptive.enabled", "true")
|
||||||
|
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
|
||||||
|
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
|
||||||
|
```
|
||||||
|
5. **Use columnar formats** (Parquet, Delta, ORC) over CSV/JSON for analytical workloads.
|
||||||
|
6. **Predicate pushdown** — filter early in the query plan; place filters before joins.
|
||||||
|
7. **Column pruning** — `select` only needed columns instead of `select("*")`.
|
||||||
|
8. **Avoid `distinct()` before `groupBy`** — the aggregation already deduplicates.
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
### Unit Testing Transformations
|
||||||
|
|
||||||
|
Test pure transformation functions without a SparkSession when possible:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.scalatest.funsuite.AnyFunSuite
|
||||||
|
|
||||||
|
class TransformationsTest extends AnyFunSuite {
|
||||||
|
test("parseStatus maps known values correctly") {
|
||||||
|
assert(parseStatus("active") == "ACTIVE")
|
||||||
|
assert(parseStatus("DISABLED") == "INACTIVE")
|
||||||
|
assert(parseStatus(null) == "UNKNOWN")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Integration Testing with SparkSession
|
||||||
|
|
||||||
|
Use a shared `SparkSession` for DataFrame-level tests:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import org.scalatest.funsuite.AnyFunSuite
|
||||||
|
|
||||||
|
trait SparkTestBase extends AnyFunSuite with BeforeAndAfterAll {
|
||||||
|
lazy val spark: SparkSession = SparkSession.builder()
|
||||||
|
.master("local[2]")
|
||||||
|
.appName("test")
|
||||||
|
.config("spark.sql.shuffle.partitions", "2")
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
override def afterAll(): Unit = {
|
||||||
|
spark.stop()
|
||||||
|
super.afterAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class EventPipelineTest extends SparkTestBase {
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
|
test("pipeline filters inactive events") {
|
||||||
|
val input = Seq(
|
||||||
|
Event(1L, "active", "US"),
|
||||||
|
Event(2L, "inactive", "EU")
|
||||||
|
).toDS()
|
||||||
|
|
||||||
|
val result = filterActive(input)
|
||||||
|
assert(result.count() == 1)
|
||||||
|
assert(result.collect().head.status == "active")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Application Packaging
|
||||||
|
|
||||||
|
### Fat JAR with sbt-assembly
|
||||||
|
|
||||||
|
```scala
|
||||||
|
// project/plugins.sbt
|
||||||
|
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
|
||||||
|
|
||||||
|
// build.sbt
|
||||||
|
assembly / assemblyMergeStrategy := {
|
||||||
|
case PathList("META-INF", _*) => MergeStrategy.discard
|
||||||
|
case _ => MergeStrategy.first
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Spark Submit
|
||||||
|
|
||||||
|
```bash
|
||||||
|
spark-submit \
|
||||||
|
--class com.example.MainApp \
|
||||||
|
--master yarn \
|
||||||
|
--deploy-mode cluster \
|
||||||
|
--num-executors 10 \
|
||||||
|
--executor-memory 8g \
|
||||||
|
--executor-cores 4 \
|
||||||
|
--conf spark.sql.adaptive.enabled=true \
|
||||||
|
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
|
||||||
|
target/scala-2.13/my-app-assembly-1.0.jar \
|
||||||
|
--input s3://bucket/input \
|
||||||
|
--output s3://bucket/output
|
||||||
|
```
|
||||||
|
|
||||||
|
## Common Anti-Patterns
|
||||||
|
|
||||||
|
| Anti-Pattern | Why It's Bad | Fix |
|
||||||
|
|---|---|---|
|
||||||
|
| `collect()` on large data | OOM on driver | Use `take(n)`, `show()`, or write to storage |
|
||||||
|
| `count()` inside loops | Triggers full DAG evaluation each time | Cache and count once |
|
||||||
|
| UDF for built-in operations | Disables Catalyst optimizer | Use `org.apache.spark.sql.functions._` |
|
||||||
|
| `var` for DataFrames | Mutable references cause confusion | Chain transformations or use `val` |
|
||||||
|
| Schema inference on CSV/JSON | Reads entire source, fragile | Define `StructType` explicitly |
|
||||||
|
| `coalesce(1)` on large data | Single-task bottleneck | Use `repartition` with reasonable count |
|
||||||
|
| Nested `map` on RDDs | Quadratic complexity | Use `join` or `broadcast` |
|
||||||
|
| Ignoring data skew | Straggler tasks, OOM | Salt keys or use AQE skew handling |
|
||||||
|
|
||||||
|
## Dynamic Allocation
|
||||||
|
|
||||||
|
Enable dynamic allocation to let Spark scale executors up and down based on workload demand. This is essential for shared clusters where fixed executor counts waste resources during idle stages:
|
||||||
|
|
||||||
|
```scala
|
||||||
|
spark.conf.set("spark.dynamicAllocation.enabled", "true")
|
||||||
|
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
|
||||||
|
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
|
||||||
|
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")
|
||||||
|
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
|
||||||
|
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
|
||||||
|
```
|
||||||
|
|
||||||
|
Or via `spark-submit`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
spark-submit \
|
||||||
|
--conf spark.dynamicAllocation.enabled=true \
|
||||||
|
--conf spark.dynamicAllocation.minExecutors=2 \
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=50 \
|
||||||
|
--conf spark.shuffle.service.enabled=true \
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
Key settings:
|
||||||
|
|
||||||
|
| Setting | Purpose |
|
||||||
|
|---|---|
|
||||||
|
| `minExecutors` | Floor — always keep at least this many executors running |
|
||||||
|
| `maxExecutors` | Ceiling — cap to prevent monopolizing the cluster |
|
||||||
|
| `initialExecutors` | Starting count before auto-scaling kicks in |
|
||||||
|
| `executorIdleTimeout` | Remove idle executors after this duration (default 60s) |
|
||||||
|
| `schedulerBacklogTimeout` | Request new executors when tasks have been pending this long |
|
||||||
|
|
||||||
|
- **Requires `spark.shuffle.service.enabled=true`** on YARN/Mesos — an external shuffle service preserves shuffle files after executors are removed. Without it, removed executors lose their shuffle data, forcing costly recomputation.
|
||||||
|
- On **Kubernetes**, use `spark.dynamicAllocation.shuffleTracking.enabled=true` instead (no external shuffle service needed).
|
||||||
|
- **Do not combine** `--num-executors` with dynamic allocation — they conflict. Remove `--num-executors` when enabling dynamic allocation.
|
||||||
Reference in New Issue
Block a user