One-Line Summary: Cloud Composer is GCP's managed Airflow, the standard tool for stitching together ingestion, transformation, and refresh jobs into a single dependable DAG.

Prerequisites: Modules 02–05.

What's the Concept?

Once you have more than two scheduled pipelines, you need orchestration. The questions multiply: did the bronze ingest finish before silver started? If silver failed, did we still embed gold? If yesterday's run was incomplete, do we re-run today's, or backfill yesterday first? Hand-rolled cron plus emails doesn't scale past about three pipelines.

Apache Airflow has been the open-source industry answer for a decade. Cloud Composer is GCP's managed flavor — same DAGs, same operators, same Python. You write workflows as Python files; Composer schedules and runs them.

How It Works

A Composer DAG for an end-to-end agent pipeline:

# dags/billing_agent_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.cloud_run import (
    CloudRunExecuteJobOperator,
)
 
default_args = {
    "owner": "platform-data",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=30),
}
 
with DAG(
    "billing_agent_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="0 * * * *",          # hourly
    catchup=False,
    default_args=default_args,
    max_active_runs=1,             # don't overlap runs
) as dag:
 
    # 1. Ingest: pull from Stripe API into bronze
    ingest_stripe = CloudRunExecuteJobOperator(
        task_id="ingest_stripe",
        project_id="myco-prod",
        region="us-central1",
        job_name="stripe-puller",
    )
 
    # 2. Silver: bronze.stripe_charges_raw → silver.orders
    silver_orders = BigQueryInsertJobOperator(
        task_id="silver_orders",
        configuration={
            "query": {
                "query": "{% include 'sql/silver_orders.sql' %}",
                "useLegacySql": False,
            }
        },
    )
 
    # 3. Gold: silver.* → gold.billing_agent_context
    gold_billing = BigQueryInsertJobOperator(
        task_id="gold_billing",
        configuration={
            "query": {
                "query": "{% include 'sql/gold_billing_agent_context.sql' %}",
                "useLegacySql": False,
            }
        },
    )
 
    # 4. Embed: refresh embedding column for changed rows
    embed_tickets = BigQueryInsertJobOperator(
        task_id="embed_tickets",
        configuration={
            "query": {
                "query": "{% include 'sql/embed_tickets.sql' %}",
                "useLegacySql": False,
            }
        },
    )
 
    # 5. Data quality: row counts, freshness, schema checks
    dq_checks = BigQueryInsertJobOperator(
        task_id="dq_checks",
        configuration={
            "query": {"query": "{% include 'sql/dq_checks.sql' %}", "useLegacySql": False}
        },
    )
 
    # Dependency graph
    ingest_stripe >> silver_orders >> [gold_billing, embed_tickets] >> dq_checks

The graph: ingest first, then silver, then gold and embeddings in parallel, then data-quality checks. If anything fails, downstream tasks don't run. Retries are automatic; alerts fire on persistent failure.

Why It Matters

  • The DAG is documentation. Anyone can open the DAG file or the Composer UI and see exactly what runs in what order, when it last ran, and where it last failed.
  • Retries and backfills are first-class. A failed task retries automatically. A failed run can be re-triggered with one click. Backfilling a date range is a CLI command.
  • Cross-system dependencies are explicit. "Wait for the Stripe ingest, then run BigQuery transforms, then call the embedding job" — declared in one Python file, executed reliably.
  • Composer integrates with everything GCP. Operators ship for BigQuery, Dataflow, Cloud Run, Cloud Build, GCS, Pub/Sub. You almost never write API plumbing yourself.

Key Technical Details

  • Composer has a minimum cost (small environments around $300/month). For pipelines with only a few DAGs, Cloud Scheduler + Cloud Run / Workflows can be 10× cheaper.
  • Use deferred operators (BigQueryInsertJobOperator with deferrable=True) for long-running BigQuery jobs — they don't hold a worker slot while waiting.
  • Keep DAGs idempotent. Tasks should produce the same output regardless of how many times they're retried (Module 04 covers this).
  • Use Airflow connections (managed in the Composer UI) for service accounts and API keys. Never put credentials in DAG files.

Common Misconceptions

"Composer can run any Python code, so it can do the transformations itself." It can, but it shouldn't. Composer's role is orchestration. The transformation logic belongs in BigQuery, Dataflow, or Cloud Run — not in DAG Python. Keeps each tier responsible for one thing.

"One giant DAG for everything." Smaller is better. One DAG per agent use case, or one per data domain, is the typical carve-up. Cross-DAG dependencies use ExternalTaskSensor when needed.

"Composer scheduling is real-time." Airflow's scheduler runs on a polling loop with a minimum granularity of ~1 minute. For sub-minute triggers, use Eventarc or Pub/Sub push subscriptions (covered in 04-event-driven-pipelines-with-eventarc.md).

Connections to Other Concepts

  • 02-dataflow-for-heavy-transforms.md — Heavy transforms triggered from a DAG.
  • 04-event-driven-pipelines-with-eventarc.md — The event-driven complement to scheduled DAGs.
  • Course 07-operating-the-system/01-observability-and-data-quality-monitoring.md — Hooking Composer alerts into the broader observability stack.

Further Reading

  • Google Cloud, "Cloud Composer overview" docs.
  • "Airflow: The Hands-On Guide" by Marc Lamberti — Practical reference for DAG patterns.
  • Maxime Beauchemin (Airflow's creator), "Functional Data Engineering" — Why DAGs are the right abstraction.