From qe-framework
Optimizes Apache Spark jobs for distributed data processing: DataFrame transformations, Spark SQL, RDD pipelines, shuffle tuning, executor memory, partitioning, and structured streaming.
How this skill is triggered — by the user, by Claude, or both
Slash command
/qe-framework:Qspark-engineerThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Senior Apache Spark engineer specializing in high-performance distributed data processing, optimizing large-scale ETL pipelines, and building production-grade Spark applications.
Senior Apache Spark engineer specializing in high-performance distributed data processing, optimizing large-scale ETL pipelines, and building production-grade Spark applications.
df.rdd.getNumPartitions()# Pattern 1: Schema-driven DataFrame creation
def create_typed_dataframe(spark, data: list, schema_dict: dict):
"""Create DataFrame with explicit schema and type safety."""
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField(k, v, True) for k, v in schema_dict.items()])
return spark.createDataFrame(data, schema=schema)
# Pattern 2: Broadcast dimension join
def broadcast_dimension_join(large_df, dim_df, join_key: str):
"""Join large fact table with small dimension (<200MB) using broadcast."""
from pyspark.sql.functions import broadcast
return large_df.join(broadcast(dim_df), on=join_key, how="left")
# Pattern 3: Safe caching with validation
def cache_with_validation(df, operation_name: str):
"""Cache DataFrame and materialize immediately to detect spill."""
cached = df.cache()
row_count = cached.count() # Materialize now, not later
print(f"{operation_name}: cached {row_count} rows")
return cached
def transform_spark_data(df, threshold: float):
"""One-line transformation summary.
Longer: explain Spark strategy, partition assumptions, performance implications.
Args:
df: Input Spark DataFrame
threshold: Filtering threshold
Returns:
Transformed Spark DataFrame
Raises:
ValueError: If threshold < 0
"""
[tool.ruff]
line-length = 120
select = ["E", "F", "W", "UP"]
ignore = ["E501"]
[tool.mypy]
python_version = "3.10"
disallow_untyped_defs = true
ignore_missing_imports = true
pandas_udf with schema validation| Anti-pattern | Fix |
|---|---|
df.collect() on large DataFrame | Use .limit(), write to storage, or .sample(0.1) |
| No partitioning in ETL | Set spark.sql.shuffle.partitions = 400 or repartition(400) |
| Python UDFs without vectorization | Use pandas_udf or Spark SQL F.col() functions |
| Caching every intermediate DataFrame | Cache only reused DataFrames; use .unpersist() |
| Ignoring Spark UI shuffle metrics | Check UI; if shuffle spill > 10%, adjust partitions/joins |
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.memory.fraction", "0.8") \
.getOrCreate()
MUST: Define schemas, partition data (200-1000 per core), broadcast small dims, monitor Spark UI, test with prod scale
MUST NOT: Collect large data, skip schema definition, cache everything, ignore shuffle, use plain UDFs on big data
npx claudepluginhub inho-team/qe-framework --plugin qe-frameworkCreates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.