From palantir-pack
Builds Palantir Foundry data pipelines using Python @transform decorators, PySpark DataFrames, and input/output wiring for ETL and dataset processing.
How this skill is triggered — by the user, by Claude, or both
Slash command
/palantir-pack:palantir-core-workflow-aThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Build Foundry data pipelines using the `transforms-python` library. Covers the `@transform` and `@transform_df` decorators, input/output dataset wiring, incremental transforms, and `@configure` for Spark tuning. This is the primary workflow for all data processing in Foundry.
Build Foundry data pipelines using the transforms-python library. Covers the @transform and @transform_df decorators, input/output dataset wiring, incremental transforms, and @configure for Spark tuning. This is the primary workflow for all data processing in Foundry.
palantir-install-auth setupmy-transforms-repo/
├── src/
│ └── myproject/
│ ├── __init__.py
│ ├── pipeline.py # Main transforms
│ ├── utils.py # Shared logic
│ └── datasets.py # Dataset path constants
├── build.gradle # Foundry build config
├── conda_recipe/meta.yaml # Dependency declarations
└── settings.gradle
# src/myproject/pipeline.py
from transforms.api import transform_df, Input, Output
@transform_df(
Output("/Company/datasets/cleaned_orders"),
orders=Input("/Company/datasets/raw_orders"),
)
def clean_orders(orders):
"""Clean raw orders: drop nulls, normalize dates, filter test data."""
from pyspark.sql import functions as F
return (
orders
.filter(F.col("order_id").isNotNull())
.filter(~F.col("email").like("%@test.com"))
.withColumn("order_date", F.to_date("order_date_str", "yyyy-MM-dd"))
.withColumn("total_cents", (F.col("total") * 100).cast("long"))
.drop("order_date_str", "total")
)
@transform_df(
Output("/Company/datasets/order_enriched"),
orders=Input("/Company/datasets/cleaned_orders"),
customers=Input("/Company/datasets/customers"),
)
def enrich_orders(orders, customers):
"""Join orders with customer data for analytics."""
from pyspark.sql import functions as F
return (
orders
.join(customers, orders.customer_id == customers.id, "left")
.select(
orders.order_id,
orders.order_date,
orders.total_cents,
customers.name.alias("customer_name"),
customers.segment,
customers.region,
)
.withColumn("processed_at", F.current_timestamp())
)
from transforms.api import transform, Input, Output
@transform(
output=Output("/Company/datasets/report_summary"),
orders=Input("/Company/datasets/order_enriched"),
)
def generate_summary(orders, output):
"""Write aggregated summary using low-level FileSystem API."""
df = orders.dataframe()
summary = (
df.groupBy("region", "segment")
.agg(
{"total_cents": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total_cents)", "revenue_cents")
.withColumnRenamed("count(order_id)", "order_count")
)
output.write_dataframe(summary)
from transforms.api import transform_df, Input, Output, incremental
@incremental()
@transform_df(
Output("/Company/datasets/daily_events"),
events=Input("/Company/datasets/raw_events"),
)
def process_events_incrementally(events):
"""Only process new rows since last build — much faster for append-only data."""
from pyspark.sql import functions as F
return events.withColumn("ingested_at", F.current_timestamp())
from transforms.api import transform_df, Input, Output, configure
@configure(profile=["DRIVER_MEMORY_LARGE"]) # 16GB driver
@transform_df(
Output("/Company/datasets/heavy_aggregation"),
data=Input("/Company/datasets/large_dataset"),
)
def heavy_compute(data):
"""Resource-intensive transform needing extra Spark memory."""
from pyspark.sql import functions as F
return (
data
.groupBy("category")
.agg(F.approx_count_distinct("user_id").alias("unique_users"))
)
@transform_df@configure| Error | Cause | Solution |
|---|---|---|
DatasetNotFound | Wrong path string | Check dataset path in Foundry UI (right-click > Copy path) |
AnalysisException: cannot resolve | Column name mismatch | Print df.columns to debug; Foundry columns are case-sensitive |
OutOfMemoryError | Insufficient Spark memory | Add @configure(profile=["DRIVER_MEMORY_LARGE"]) |
Transform is not incremental-compatible | Using non-append operations | Only use filter/select/withColumn in incremental transforms |
| Build hangs | Circular dependency | Check that no two transforms reference each other's output |
from transforms.api import transform_polars, Input, Output
@transform_polars(
Output("/Company/datasets/fast_summary"),
data=Input("/Company/datasets/small_table"),
)
def fast_polars(data):
"""Use Polars for small datasets — faster than Spark, no JVM overhead."""
import polars as pl
return data.group_by("category").agg(pl.col("amount").sum())
palantir-core-workflow-bpalantir-performance-tuningpalantir-multi-env-setupnpx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin palantir-packSets up Palantir Foundry local dev for Python transforms with PySpark, pytest testing on sample data, and API mocking for rapid iteration.
Designs data pipelines using functional principles: idempotency, immutability, declarative transformations. Guides on ELT, partitioning, dbt layers, data quality tests, and DAG orchestration.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.