mirror of
https://github.com/github/awesome-copilot.git
synced 2026-06-13 11:33:32 +00:00
chore: publish from staged
This commit is contained in:
@@ -176,6 +176,7 @@ See [CONTRIBUTING.md](../CONTRIBUTING.md#adding-instructions) for guidelines on
|
||||
| [Ruby on Rails](../instructions/ruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md) | Ruby on Rails coding conventions and guidelines |
|
||||
| [Rust Coding Conventions and Best Practices](../instructions/rust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md) | Rust programming language coding conventions and best practices |
|
||||
| [Rust MCP Server Development Best Practices](../instructions/rust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md) | Best practices for building Model Context Protocol servers in Rust using the official rmcp SDK with async/await patterns |
|
||||
| [Scala + Apache Spark Best Practices](../instructions/scala-spark.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md) | Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns. |
|
||||
| [Scala Best Practices](../instructions/scala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md) | Scala 2.12/2.13 programming language coding conventions and best practices following Databricks style guide for functional programming, type safety, and production code quality. |
|
||||
| [Security Standards](../instructions/security-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md) | Comprehensive secure coding standards based on OWASP Top 10 2025, with 55+ anti-patterns, detection regex, framework-specific fixes for modern web and backend frameworks, and AI/LLM security guidance. |
|
||||
| [Self-explanatory Code Commenting Instructions](../instructions/self-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md)<br />[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md) | Guidelines for GitHub Copilot to write comments to achieve self-explanatory code with less comments. Examples are in JavaScript but it should work on any language that has comments. |
|
||||
|
||||
@@ -0,0 +1,531 @@
|
||||
---
|
||||
description: 'Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns.'
|
||||
applyTo: '**/*.scala, **/build.sbt, **/build.sc'
|
||||
---
|
||||
|
||||
# Scala + Apache Spark Best Practices
|
||||
|
||||
Guidelines for writing efficient, maintainable, and production-ready Apache Spark applications in Scala.
|
||||
|
||||
## Dependencies
|
||||
|
||||
### SBT
|
||||
|
||||
```scala
|
||||
val sparkVersion = "3.5.1"
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
|
||||
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
|
||||
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
|
||||
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
|
||||
)
|
||||
```
|
||||
|
||||
### Maven
|
||||
|
||||
```xml
|
||||
<properties>
|
||||
<spark.version>3.5.1</spark.version>
|
||||
<scala.binary.version>2.13</scala.binary.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
```
|
||||
|
||||
Mark Spark dependencies as `"provided"` since the cluster supplies them at runtime. Only bundle application-specific libraries in the fat JAR.
|
||||
|
||||
## SparkSession Setup
|
||||
|
||||
Always use `SparkSession` as the single entry point:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
val spark: SparkSession = SparkSession.builder()
|
||||
.appName("MyApplication")
|
||||
.config("spark.sql.shuffle.partitions", "200")
|
||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.getOrCreate()
|
||||
|
||||
import spark.implicits._
|
||||
```
|
||||
|
||||
- Do **not** create multiple `SparkSession` instances in the same JVM.
|
||||
- Avoid hardcoding `master` in application code; set it at submit time via `--master`.
|
||||
|
||||
## DataFrames vs Datasets vs RDDs
|
||||
|
||||
Prefer the **DataFrame API** (untyped `Dataset[Row]`) for most workloads. Use **Datasets** (typed) when compile-time type safety justifies the serialization overhead. Avoid raw **RDDs** unless you need low-level control.
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.{DataFrame, Dataset}
|
||||
|
||||
// Preferred — DataFrame API
|
||||
val df: DataFrame = spark.read.parquet("data/events")
|
||||
val result = df
|
||||
.filter($"status" === "active")
|
||||
.groupBy($"region")
|
||||
.agg(count("*").as("total"))
|
||||
|
||||
// Typed Dataset — use when schema safety matters
|
||||
case class Event(id: Long, status: String, region: String)
|
||||
val ds: Dataset[Event] = df.as[Event]
|
||||
val active = ds.filter(_.status == "active")
|
||||
```
|
||||
|
||||
## Schema Management
|
||||
|
||||
Always define schemas explicitly when reading semi-structured data instead of relying on schema inference:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
val schema = StructType(Seq(
|
||||
StructField("id", LongType, nullable = false),
|
||||
StructField("name", StringType, nullable = true),
|
||||
StructField("timestamp", TimestampType, nullable = false),
|
||||
StructField("amount", DecimalType(18, 2), nullable = true),
|
||||
StructField("tags", ArrayType(StringType), nullable = true)
|
||||
))
|
||||
|
||||
val df = spark.read
|
||||
.schema(schema)
|
||||
.json("data/events/*.json")
|
||||
```
|
||||
|
||||
- Schema inference (`inferSchema=true`) reads the entire data source and is expensive for large files.
|
||||
- For Parquet and Delta, the schema is embedded — explicit definition is unnecessary.
|
||||
|
||||
## Column Expressions
|
||||
|
||||
Prefer `col()` or `$""` over string column names in transformations for early error detection:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
// Good — type-checked column references
|
||||
df.select(col("name"), $"amount" * 1.1 as "adjusted_amount")
|
||||
|
||||
// Avoid — string-only references delay errors to runtime
|
||||
df.select("name", "amount")
|
||||
```
|
||||
|
||||
## Joins
|
||||
|
||||
### Broadcast Joins
|
||||
|
||||
Broadcast the smaller side of a join when it fits in executor memory (typically < 100 MB):
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.functions.broadcast
|
||||
|
||||
val enriched = largeDF.join(
|
||||
broadcast(smallLookupDF),
|
||||
Seq("key"),
|
||||
"left"
|
||||
)
|
||||
```
|
||||
|
||||
### Avoiding Cartesian Products
|
||||
|
||||
Never use cross joins unless intentional. Enable the safeguard:
|
||||
|
||||
```scala
|
||||
spark.conf.set("spark.sql.crossJoin.enabled", "false")
|
||||
```
|
||||
|
||||
### Skew Handling
|
||||
|
||||
For joins on skewed keys, salt the key to distribute load:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
val saltBuckets = 10
|
||||
val saltedLeft = leftDF.withColumn("salt", (rand() * saltBuckets).cast("int"))
|
||||
val saltedRight = rightDF
|
||||
.crossJoin((0 until saltBuckets).toDF("salt"))
|
||||
|
||||
val result = saltedLeft
|
||||
.join(saltedRight, Seq("join_key", "salt"))
|
||||
.drop("salt")
|
||||
```
|
||||
|
||||
The tradeoff is that the right side grows by 10×, so this only works when the right side is reasonably small or the skew is severe enough to justify it. For Spark 3.x+, AQE's built-in skew join handling (`spark.sql.adaptive.skewJoin.enabled = true`) can do this automatically without manual salting.
|
||||
|
||||
## Partitioning and Bucketing
|
||||
|
||||
### Write Partitioning
|
||||
|
||||
Partition output by high-cardinality filter columns (e.g., date):
|
||||
|
||||
```scala
|
||||
df.write
|
||||
.partitionBy("year", "month")
|
||||
.mode("overwrite")
|
||||
.parquet("output/events")
|
||||
```
|
||||
|
||||
- Avoid partitioning on high-cardinality columns (e.g., user ID) which creates millions of small files.
|
||||
|
||||
### Shuffle Partitions
|
||||
|
||||
Tune `spark.sql.shuffle.partitions` based on data volume:
|
||||
|
||||
```scala
|
||||
// Default is 200; adjust based on data size
|
||||
// Rule of thumb: target 128 MB per partition
|
||||
spark.conf.set("spark.sql.shuffle.partitions", "400")
|
||||
```
|
||||
|
||||
### Repartition vs Coalesce
|
||||
|
||||
```scala
|
||||
// Repartition — full shuffle, use to increase or evenly distribute partitions
|
||||
df.repartition(100, $"key")
|
||||
|
||||
// Coalesce — no shuffle, use only to reduce partition count
|
||||
df.coalesce(10)
|
||||
```
|
||||
|
||||
Never use `coalesce(1)` on large datasets — it forces all data through a single task.
|
||||
|
||||
## Caching and Persistence
|
||||
|
||||
Cache only when a DataFrame is reused multiple times:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
val cached = expensiveDF.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
cached.count() // materialize the cache
|
||||
|
||||
// Use cached DF multiple times
|
||||
val summary = cached.groupBy("region").count()
|
||||
val filtered = cached.filter($"amount" > 1000)
|
||||
|
||||
// Always unpersist when done
|
||||
cached.unpersist()
|
||||
```
|
||||
|
||||
- Prefer `MEMORY_AND_DISK` over `MEMORY_ONLY` to avoid recomputation on eviction.
|
||||
- Never cache DataFrames that are only used once.
|
||||
|
||||
## UDFs — Use Sparingly
|
||||
|
||||
Prefer built-in Spark SQL functions over UDFs. UDFs disable Catalyst optimizations and require serialization:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
// Good — use built-in functions
|
||||
df.withColumn("upper_name", upper($"name"))
|
||||
.withColumn("name_length", length($"name"))
|
||||
|
||||
// Avoid — UDF for something built-in functions handle
|
||||
val upperUdf = udf((s: String) => s.toUpperCase)
|
||||
df.withColumn("upper_name", upperUdf($"name"))
|
||||
```
|
||||
|
||||
When a UDF is unavoidable, prefer `spark.udf.register` for SparkSQL compatibility, and handle nulls explicitly:
|
||||
|
||||
```scala
|
||||
val parseStatus = udf((raw: String) => {
|
||||
Option(raw).map(_.trim.toLowerCase) match {
|
||||
case Some("active") | Some("enabled") => "ACTIVE"
|
||||
case Some("inactive") | Some("disabled") => "INACTIVE"
|
||||
case _ => "UNKNOWN"
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
## Window Functions
|
||||
|
||||
Use window functions for ranking, running totals, and lag/lead calculations:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
|
||||
val windowSpec = Window
|
||||
.partitionBy("department")
|
||||
.orderBy($"salary".desc)
|
||||
|
||||
val ranked = df
|
||||
.withColumn("rank", rank().over(windowSpec))
|
||||
.withColumn("dense_rank", dense_rank().over(windowSpec))
|
||||
.withColumn("row_number", row_number().over(windowSpec))
|
||||
.withColumn("running_total", sum($"salary").over(
|
||||
Window.partitionBy("department").orderBy("hire_date")
|
||||
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
|
||||
))
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Corrupt Record Handling
|
||||
|
||||
```scala
|
||||
val df = spark.read
|
||||
.option("mode", "PERMISSIVE") // default: keeps corrupt rows
|
||||
.option("columnNameOfCorruptRecord", "_corrupt_record")
|
||||
.schema(schema)
|
||||
.json("data/events")
|
||||
|
||||
val clean = df.filter($"_corrupt_record".isNull).drop("_corrupt_record")
|
||||
val bad = df.filter($"_corrupt_record".isNotNull)
|
||||
bad.write.json("data/quarantine")
|
||||
```
|
||||
|
||||
### Accumulator-Based Error Counting
|
||||
|
||||
```scala
|
||||
val parseErrors = spark.sparkContext.longAccumulator("parseErrors")
|
||||
|
||||
val parsed = df.map { row =>
|
||||
try {
|
||||
parseRow(row)
|
||||
} catch {
|
||||
case _: Exception =>
|
||||
parseErrors.add(1)
|
||||
null
|
||||
}
|
||||
}.filter(_ != null)
|
||||
|
||||
println(s"Parse errors: ${parseErrors.value}")
|
||||
```
|
||||
|
||||
> **Caveat:** Accumulators are only guaranteed accurate inside actions (`count`, `collect`, `write`). If tasks are retried due to failures, accumulators can over-count. For exact error tracking, prefer the quarantine pattern above; use accumulators for operational monitoring only.
|
||||
|
||||
## Streaming (Structured Streaming)
|
||||
|
||||
```scala
|
||||
val stream = spark.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", "broker:9092")
|
||||
.option("subscribe", "events")
|
||||
.option("startingOffsets", "latest")
|
||||
.load()
|
||||
|
||||
val parsed = stream
|
||||
.selectExpr("CAST(value AS STRING) as json")
|
||||
.select(from_json($"json", schema).as("data"))
|
||||
.select("data.*")
|
||||
|
||||
val query = parsed.writeStream
|
||||
.format("delta")
|
||||
.option("checkpointLocation", "/checkpoints/events")
|
||||
.outputMode("append")
|
||||
.trigger(Trigger.ProcessingTime("30 seconds"))
|
||||
.start("output/events")
|
||||
|
||||
query.awaitTermination()
|
||||
```
|
||||
|
||||
- Always set a checkpoint location for fault tolerance.
|
||||
- Use `Trigger.ProcessingTime` or `Trigger.AvailableNow` — avoid `Trigger.Once` in production (use `AvailableNow` instead).
|
||||
|
||||
## Delta Lake Integration
|
||||
|
||||
```scala
|
||||
import io.delta.tables.DeltaTable
|
||||
|
||||
// Upsert / merge
|
||||
val target = DeltaTable.forPath(spark, "data/customers")
|
||||
|
||||
target.as("t")
|
||||
.merge(updatesDF.as("s"), "t.id = s.id")
|
||||
.whenMatched.updateAll()
|
||||
.whenNotMatched.insertAll()
|
||||
.execute()
|
||||
|
||||
// Time travel
|
||||
val yesterday = spark.read
|
||||
.format("delta")
|
||||
.option("timestampAsOf", "2025-01-15")
|
||||
.load("data/customers")
|
||||
|
||||
// Optimize and vacuum
|
||||
target.optimize().executeCompaction()
|
||||
target.vacuum(168) // retain 7 days
|
||||
```
|
||||
|
||||
## Performance Tuning Checklist
|
||||
|
||||
1. **Minimize shuffles** — use `broadcast` joins, pre-partition data, avoid unnecessary `groupBy`.
|
||||
2. **Avoid `collect()` on large DataFrames** — it pulls all data to the driver.
|
||||
3. **Prefer `explain(true)`** to inspect physical plans before running expensive jobs.
|
||||
4. **Enable Adaptive Query Execution (AQE)**:
|
||||
```scala
|
||||
spark.conf.set("spark.sql.adaptive.enabled", "true")
|
||||
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
|
||||
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
|
||||
```
|
||||
5. **Use columnar formats** (Parquet, Delta, ORC) over CSV/JSON for analytical workloads.
|
||||
6. **Predicate pushdown** — filter early in the query plan; place filters before joins.
|
||||
7. **Column pruning** — `select` only needed columns instead of `select("*")`.
|
||||
8. **Avoid `distinct()` before `groupBy`** — the aggregation already deduplicates.
|
||||
|
||||
## Testing
|
||||
|
||||
### Unit Testing Transformations
|
||||
|
||||
Test pure transformation functions without a SparkSession when possible:
|
||||
|
||||
```scala
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
|
||||
class TransformationsTest extends AnyFunSuite {
|
||||
test("parseStatus maps known values correctly") {
|
||||
assert(parseStatus("active") == "ACTIVE")
|
||||
assert(parseStatus("DISABLED") == "INACTIVE")
|
||||
assert(parseStatus(null) == "UNKNOWN")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Integration Testing with SparkSession
|
||||
|
||||
Use a shared `SparkSession` for DataFrame-level tests:
|
||||
|
||||
```scala
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
|
||||
trait SparkTestBase extends AnyFunSuite with BeforeAndAfterAll {
|
||||
lazy val spark: SparkSession = SparkSession.builder()
|
||||
.master("local[2]")
|
||||
.appName("test")
|
||||
.config("spark.sql.shuffle.partitions", "2")
|
||||
.getOrCreate()
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
spark.stop()
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
||||
|
||||
class EventPipelineTest extends SparkTestBase {
|
||||
import spark.implicits._
|
||||
|
||||
test("pipeline filters inactive events") {
|
||||
val input = Seq(
|
||||
Event(1L, "active", "US"),
|
||||
Event(2L, "inactive", "EU")
|
||||
).toDS()
|
||||
|
||||
val result = filterActive(input)
|
||||
assert(result.count() == 1)
|
||||
assert(result.collect().head.status == "active")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Application Packaging
|
||||
|
||||
### Fat JAR with sbt-assembly
|
||||
|
||||
```scala
|
||||
// project/plugins.sbt
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
|
||||
|
||||
// build.sbt
|
||||
assembly / assemblyMergeStrategy := {
|
||||
case PathList("META-INF", _*) => MergeStrategy.discard
|
||||
case _ => MergeStrategy.first
|
||||
}
|
||||
```
|
||||
|
||||
### Spark Submit
|
||||
|
||||
```bash
|
||||
spark-submit \
|
||||
--class com.example.MainApp \
|
||||
--master yarn \
|
||||
--deploy-mode cluster \
|
||||
--num-executors 10 \
|
||||
--executor-memory 8g \
|
||||
--executor-cores 4 \
|
||||
--conf spark.sql.adaptive.enabled=true \
|
||||
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
|
||||
target/scala-2.13/my-app-assembly-1.0.jar \
|
||||
--input s3://bucket/input \
|
||||
--output s3://bucket/output
|
||||
```
|
||||
|
||||
## Common Anti-Patterns
|
||||
|
||||
| Anti-Pattern | Why It's Bad | Fix |
|
||||
|---|---|---|
|
||||
| `collect()` on large data | OOM on driver | Use `take(n)`, `show()`, or write to storage |
|
||||
| `count()` inside loops | Triggers full DAG evaluation each time | Cache and count once |
|
||||
| UDF for built-in operations | Disables Catalyst optimizer | Use `org.apache.spark.sql.functions._` |
|
||||
| `var` for DataFrames | Mutable references cause confusion | Chain transformations or use `val` |
|
||||
| Schema inference on CSV/JSON | Reads entire source, fragile | Define `StructType` explicitly |
|
||||
| `coalesce(1)` on large data | Single-task bottleneck | Use `repartition` with reasonable count |
|
||||
| Nested `map` on RDDs | Quadratic complexity | Use `join` or `broadcast` |
|
||||
| Ignoring data skew | Straggler tasks, OOM | Salt keys or use AQE skew handling |
|
||||
|
||||
## Dynamic Allocation
|
||||
|
||||
Enable dynamic allocation to let Spark scale executors up and down based on workload demand. This is essential for shared clusters where fixed executor counts waste resources during idle stages:
|
||||
|
||||
```scala
|
||||
spark.conf.set("spark.dynamicAllocation.enabled", "true")
|
||||
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
|
||||
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
|
||||
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")
|
||||
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
|
||||
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
|
||||
```
|
||||
|
||||
Or via `spark-submit`:
|
||||
|
||||
```bash
|
||||
spark-submit \
|
||||
--conf spark.dynamicAllocation.enabled=true \
|
||||
--conf spark.dynamicAllocation.minExecutors=2 \
|
||||
--conf spark.dynamicAllocation.maxExecutors=50 \
|
||||
--conf spark.shuffle.service.enabled=true \
|
||||
...
|
||||
```
|
||||
|
||||
Key settings:
|
||||
|
||||
| Setting | Purpose |
|
||||
|---|---|
|
||||
| `minExecutors` | Floor — always keep at least this many executors running |
|
||||
| `maxExecutors` | Ceiling — cap to prevent monopolizing the cluster |
|
||||
| `initialExecutors` | Starting count before auto-scaling kicks in |
|
||||
| `executorIdleTimeout` | Remove idle executors after this duration (default 60s) |
|
||||
| `schedulerBacklogTimeout` | Request new executors when tasks have been pending this long |
|
||||
|
||||
- **Requires `spark.shuffle.service.enabled=true`** on YARN/Mesos — an external shuffle service preserves shuffle files after executors are removed. Without it, removed executors lose their shuffle data, forcing costly recomputation.
|
||||
- On **Kubernetes**, use `spark.dynamicAllocation.shuffleTracking.enabled=true` instead (no external shuffle service needed).
|
||||
- **Do not combine** `--num-executors` with dynamic allocation — they conflict. Remove `--num-executors` when enabling dynamic allocation.
|
||||
Reference in New Issue
Block a user