From dak
Guides writing, packaging, executing, and troubleshooting Apache Beam pipelines on Dataflow. Use when creating pipelines, configuring Flex Templates, or analyzing job performance.
How this skill is triggered — by the user, by Claude, or both
Slash command
/dak:gcp-dataflowThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Use this section when implementing Dataflow pipeline logic using Apache Beam.
references/bottlenecks_and_parallelism_context.mdreferences/dataflow_diagnostics_reference.mdreferences/dataflow_metrics_bigquery.mdreferences/dataflow_metrics_core_job.mdreferences/dataflow_metrics_pubsub.mdreferences/dataflow_metrics_streaming_engine.mdreferences/python_flex_template_reference.mdreferences/streaming_horizontal_autoscaling_analysis.mdreferences/streaming_job_health.mdUse this section when implementing Dataflow pipeline logic using Apache Beam.
Google provides a variety of pre-built, open source Dataflow templates that can be used for common scenarios. Before implementing a pipeline from scratch, you MUST follow the steps below to check whether a Dataflow template for the pipeline logic you need to implement already exists.
Step 1: Check for a matching Google Dataflow Template
dataflow-templates bucket:
gs://dataflow-templates/latest.gs://dataflow-templates/latest/flex. Use
gcloud storage ls to list the contents.Step 2: Confirm template selection
Use this section when creating a new project for a Dataflow pipeline from scratch.
requirements.txt, and other similar files where versions
are specified.Use this section when configuring a Dataflow Java pipeline project using gradle.
com.github.johnrengelman.shadow) unless the user explicitly requests a
Fat Jar.application plugin for
passing command-line parameters.slf4j-api version pulled transitively by Apache Beam.slf4j-simple,
logback-classic, etc.) to exactly match the major/minor version of the
resolved slf4j-api.Use this section to package pipeline code as a Flex template.
Flex Templates offer a hermetic and reproducible launch environment for a
pipeline. They are easy to launch with gcloud or with orchestrators like Cloud
Composer. You MUST package the pipeline as a Flex Template when creating new
Dataflow pipeline projects.
Follow the steps below:
--sdk_container_image). Does the Python
pipeline require extra dependencies (e.g., using --requirements_file,
--setup_file, or --extra_package)? If so, YOU MUST recommend the
Single Docker Image Configuration for the Flex Template. See
python_flex_template_reference.md for details.cloudbuild.yaml out-of-the-box for
building and pushing images unless local setup is explicitly requested.DoFn.setup() lifecycle using the Secret
Manager client library (writing them to ephemeral worker disk like
/tmp only if physical file paths are strictly required). Ensure the
Dataflow Worker Service Account has the
roles/secretmanager.secretAccessor role.Use this section when the user has selected a Google-provided template (Classic or Flex) and you need to configure it.
Step 1: Get template metadata
gs://dataflow-templates and end
with _metadata (e.g.,
gs://dataflow-templates/latest/Word_Count_metadata).gs://dataflow-templates/latest/flex (e.g.
gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery).javascriptTextTransformGcsPath,
javascriptTextTransformFunctionName), refer to the
UDF guide to write and configure the UDF.extraFilesToStage
parameter. The runner will drop them into /extra_files on worker
VMs. Refer to the SSL certificates guide for local
referencing syntax (/extra_files/...).Step 2: Get network configuration
gcloud commands to list networks and subnetworks.Step 3: Identify required parameters and prepare resources
[!IMPORTANT]
Strict parameter validation: Any parameter in the metadata JSON
that does NOT explicitly have
"isOptional": trueis **strictly
required** by the Dataflow API.
This applies even if the description suggests it has a default value
(e.g.,
csvFormatorbadRecordsOutputTablein some templates).
You must identify and supply all of them.
schemaJSONPath
or JSONPath) is required:
Different templates might require specific resources to be prepared in the target sink before execution. Follow the instructions for your target sink below.
When running templates that write to BigQuery, you MUST ensure the following resources are prepared to prevent job failures:
bq mk) before launching the job. Ensure the schema matches the template's
expectations.badRecordsOutputTable or outputDeadletterTable). Some templates attempt
to auto-create this table. However, pre-creating it is a best practice. This
ensures correct schema and permissions.
v1/ or v2/ directories).badRecordsOutputTable or outputDeadletterTable).PubSubToBigQuery.java, the
schema is set using
ResourceUtils.getDeadletterTableSchemaJson(). Tracing
ResourceUtils.java shows it loads the schema from
streaming_source_deadletter_table_schema.json on GitHub.RawContent/ErrorMsg schema shown in the
CSV to BigQuery DevSite Doc.Use this section when preparing to run a custom Apache Beam pipeline on Dataflow.
When launching Python Pipelines without a Flex Template with
DataflowRunner, you MUST scan the pipeline project directory for the
following files:
requirements.txt:
--requirements_file pipeline option.setup.py:
--setup_file pipeline option. This is
critical if the pipeline uses local modules or packages.When launching Python Pipelines with a Flex Template, if the Flex Template
image is also the SDK Container image (Single Docker Image Configuration),
then you MUST supply the image in the sdk_container_image parameter.
your-gcp-project-id) for GCP
resources when drafting run scripts or configs. Action: If values are
unknown, proactively run commands like gcloud config get-value project to
find active resources to pre-fill scripts for the user. Confirm the values
with the user before proceeding.Use this section when configuration is complete and you are ready to launch any Dataflow job (Google-provided template, Custom Flex template, or standalone pipeline).
gcloud dataflow flex-template run or python main.py --runner=DataflowRunner). Ensure workers default to private IP
configuration unless specified otherwise, and verify target project
permissions.Use this section to monitor the progress of a running Dataflow job.
RUNNING or DONE state.[!IMPORTANT] YOU MUST use this section when the user asks about performance of their Dataflow pipelines. This can be used to debug issues like pipeline slowness, pipeline failures, etc.
Understand User Request: Extract Job ID, Project ID, Transform Name (optional), and Time Window.
Transform Name Mapping: If the user requires transform-based debugging,
map user-provided Transform Names to actual Dataflow stage or ptransform
and apply to filters while querying:
This mapping can be extracted from gcloud dataflow jobs describe JOB_ID --full --format="json(pipelineDescription.executionPipelineStage)".
name property at the parent stage level. This
matches "F[digit]" (e.g. "F6").componentTransform array, read
precisely from userName or originalTransform (e.g.
"RateLimitAndLog/ParMultiDo(RateLimitAndLog)"). and use it as
ptransform.resource.labels.step_id="[Extracted ptransform name]".metric.labels.ptransform="[Extracted ptransform name]" or
metric.labels.stage="[Extracted stage_id]".Query Telemetry:
Analysis:
Output: Provide a synthesized diagnosis containing symptoms, root
causes, and target code links (using file:///... format). Strictly follow
the response structure appropriate for the job type:
For Streaming Jobs:
job/data_watermark_age / job/per_stage_data_watermark_age and system
lag.job/elements_produced_count / job/estimated_bytes_produced_count.job/estimated_backlog_processing_time / job/backlog_bytes.job/is_bottleneck (interpreting likely_cause / bottleneck_kind)
and key metrics job/backlogged_keys /
job/processing_parallelism_keys interpreted in the context of
bottlenecks_and_parallelism_context.job/horizontal_worker_scaling (and label rationale), clamp limits
(job/max_worker_instances_limit / job/min_worker_instances_limit),
and utilization hints in the context of
streaming_horizontal_autoscaling_analysis.file:/// URIs).For Batch Jobs:
job/elements_produced_count (primary performance indicator).file:/// URIs).npx claudepluginhub gemini-cli-extensions/data-agent-kit-starter-pack --plugin dakDesign and troubleshoot GCP data pipelines including Dataflow (Apache Beam), Pub/Sub messaging, Dataproc (Spark/Hadoop), Cloud Composer (Airflow), and Dataplex governance.
Generates, updates, and deploys Google Cloud Composer orchestration pipelines for data pipelines including dbt, Spark, Dataform, notebooks, Python scripts, and BigQuery SQL. Creates deployment.yaml and orchestration YAML files.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.