Building Your First DAG with Apache Airflow

Apache Airflow is a powerful platform for orchestrating complex workflows. After learning the Fundamentals and installing Airflow with Docker, it’s time to dive into one of its most essential features – the Directed Acyclic Graph (DAG). In this article, we’ll explore what a DAG is, break down its structure, and build a complete example DAG that demonstrates an end-to-end ETL (Extract, Transform, Load) process.

The Anatomy of a DAG

Let’s break down the components that make up a DAG in Apache Airflow:

  1. Default Arguments (default_args):
    These are key-value pairs that define common parameters for tasks. They include settings such as the owner, retry behavior, start date, and email notifications. Centralizing these settings ensures consistency across all tasks in the DAG.
  2. DAG Object:
    The DAG object is defined by its ID, a description, the default arguments, and the schedule interval. It acts as a container for all the tasks.
  3. Operators and Tasks:
    Tasks are the individual units of work. Operators (e.g., PythonOperator, BashOperator) encapsulate the logic to execute a particular task. Each task is associated with a unique task ID.
  4. Task Dependencies:
    You establish the order in which tasks run by setting dependencies. This can be done using bitshift operators (e.g., task1 >> task2) or methods like set_downstream() and set_upstream().
  5. Scheduling:
    The schedule_interval defines how often your DAG runs. It can be set to a cron expression, a timedelta, or even None if you prefer manual triggering.

Step 1: Set Up Your Environment

Before you begin, ensure Apache Airflow is installed and running. If you’re using Docker, you likely launched Airflow via a docker-compose.yaml file, which starts the scheduler, webserver, and other core components. To add your workflows, place DAG files in the mounted dags/ directory – this allows Airflow to automatically detect and execute them.

If you’re new to Airflow or unsure how to structure your project, the Astro CLI simplifies setup. This open-source tool initializes a fully configured Airflow environment with one command:

astro dev init --from-template

This generates all necessary files and even includes an example DAG to help you start experimenting immediately. Read Create an Astra project for more details on the project structure.

Step 2: Create Your DAG File

Create a new Python file (cocktail_apil_dag.py) in your dags/ folder. This file will contain all the logic for your DAG.

Step 3: Import Required Modules

At the beginning of the file, we import the necessary modules. These include core Airflow classes, operators for interacting with PostgreSQL and HTTP endpoints, as well as helper libraries like json, pandas for data normalization, and datetime for scheduling.

# Core Airflow imports
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.exceptions import AirflowException

# Standard library imports
from datetime import datetime, timedelta
from typing import Any
import json
import logging 

# Third party imports
from pandas import json_normalize
  • DAG: The central object that orchestrates the workflow.
  • Operators and Sensors: Specialized classes to perform tasks such as running SQL commands, making HTTP calls, and executing Python functions.
  • PostgresHook: Provides an interface to interact with a PostgreSQL database.
  • json_normalize: Used to convert JSON data into a flat table, ideal for processing API responses.

Step 4: Defining Helper Functions

Two helper functions are defined outside the DAG context. These functions will later be used by PythonOperators to process and store the data.

a. Data Processing Function

def _fetch_cocktail(ti: Any) -> None:
    """
    Process cocktail data from the API response
    
    Args:
        ti: Airflow's task instance object for accessing XCom data
        
    Flow:
        1. Pulls data from XCom (shared by extract_cocktail task)
        2. Validates the data structure and required fields
        3. Transforms data into CSV format
        4. Saves to a temporary file for database loading
        
    Raises:
        ValueError: When data is missing or invalid
        AirflowException: For any processing failures
    """
    try:
        # Get data from previous task via XCom
        cocktail_data = ti.xcom_pull(task_ids="extract_cocktail")
        if not cocktail_data or 'drinks' not in cocktail_data:
            raise ValueError("No cocktail data received from API")
            
        # Extract first drink from results
        cocktail = cocktail_data['drinks'][0]
        logger.info(f"Processing cocktail: {cocktail.get('strDrink', 'Unknown')}")
        
        # Define and validate required fields
        required_fields = ['idDrink', 'strDrink', 'strCategory', 'strAlcoholic', 'strGlass']
        if not all(field in cocktail for field in required_fields):
            missing_fields = [f for f in required_fields if f not in cocktail]
            raise ValueError(f"Missing required fields: {missing_fields}")
        
        # Transform data to normalized format and save to CSV
        result = json_normalize({field: cocktail[field] for field in required_fields})
        result.to_csv('/tmp/fetch_cocktail.csv', index=None, header=False)
        logger.info(f"Successfully processed cocktail: {cocktail['strDrink']}")
        
    except Exception as e:
        logger.error(f"Error processing cocktail: {str(e)}")
        raise AirflowException(f"Cocktail processing failed: {str(e)}")

b. Data Storage Function

def _store_cocktail() -> None:
    """
    Store processed cocktail data in PostgreSQL database
    
    Flow:
        1. Connects to PostgreSQL using Airflow's hook
        2. Copies data from temporary CSV file to database
        3. Cleans up temporary file
        
    Note:
        - Uses COPY command for efficient data loading
        - Ensures cleanup of temporary files even if operation fails
        
    Raises:
        AirflowException: When database operations fail
    """
    try:
        # Initialize PostgreSQL connection
        hook = PostgresHook(postgres_conn_id='postgres')
        
        # Copy data from CSV to database using PostgreSQL COPY command
        hook.copy_expert(
            sql="COPY cocktail FROM stdin WITH DELIMITER as ','",
            filename='/tmp/fetch_cocktail.csv'
        )
        logger.info("Successfully stored cocktail in database")
        
    except Exception as e:
        logger.error(f"Database operation failed: {str(e)}")
        raise AirflowException(f"Failed to store cocktail: {str(e)}")
    finally:
        # Clean up temporary file regardless of success/failure
        import os
        if os.path.exists('/tmp/fetch_cocktail.csv'):
            os.remove('/tmp/fetch_cocktail.csv')

Step 5: Define Default Arguments and Initialize the DAG

Define the default_args dictionary to include common settings for all tasks. This helps manage parameters such as the owner, start date, retries, and retry delay.

# DAG default arguments
# These arguments will be applied to all tasks in the DAG unless overridden
default_args = {
    'owner': 'airflow',                    # Owner of the DAG
    'depends_on_past': False,              # Tasks don't depend on past runs
    'retries': 2,                          # Number of retries if task fails
    'retry_delay': timedelta(minutes=5),   # Delay between retries
    'tags': ["cocktail", "api"],           # Tags for organizing DAGs in UI
    'email_on_failure': True,              # Send email when task fails
}

# DAG definition
# This is the main DAG configuration that defines the workflow
with DAG(
    'cocktail_api_dag',
    default_args=default_args,
    description='Fetches and stores random cocktail data from CocktailDB API',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    max_active_runs=1
) as dag:

6. Creating the DAG and Its Tasks

The DAG is defined using a context manager (with DAG(...) as dag:), which ensures that all tasks declared within the block are automatically associated with this DAG.

a. Creating the PostgreSQL Table

# Task definitions
# Task 1: Creates the PostgreSQL table if it doesn't exist
# - Connects to PostgreSQL using Airflow's PostgresOperator
# - Executes SQL command to create the 'cocktail' table
# - Ensures the table has unique constraint on 'idDrink'
create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='postgres',
    sql='''
        CREATE TABLE IF NOT EXISTS cocktail (
            idDrink INT NOT NULL,
            strDrink TEXT NOT NULL,
            strCategory TEXT NOT NULL,
            strAlcoholic TEXT NOT NULL, 
            strGlass TEXT NOT NULL,
            CONSTRAINT unique_drink UNIQUE (idDrink)
        );
    '''
)

b. Checking API Availability

# Task 2: Checks if the CocktailDB API is available
# - Acts as a sensor that polls the API endpoint
# - Waits up to 5 minutes (300 seconds) for API to respond
# - Checks every 60 seconds (1 minute)
# - Uses 'reschedule' mode to free up worker slots while waiting
poll_api = HttpSensor(
    task_id='poll_api',
    http_conn_id='cocktail_api',
    endpoint='api/json/v1/1/random.php',
    poke_interval=60,
    timeout=300,
    mode='reschedule'
)

c. Extracting Data from the API

# Task 3: Fetches a random cocktail from the AP
# - Makes an HTTP GET request to the API
# - Converts JSON response to Python dictionary
# - Stores the result in XCom for next task
# - Logs the API response for debugging
extract_cocktail = SimpleHttpOperator(
    task_id='extract_cocktail',
    http_conn_id='cocktail_api',
    endpoint='api/json/v1/1/random.php',
    method='GET',
    response_filter=lambda response: json.loads(response.text),
    log_response=True
)

d. Processing the Extracted Data

# Task 4: Processes the cocktail data
# - Gets data from previous task using XCom
# - Validates required fields are present
# - Transforms data into correct format
# - Saves to temporary CSV file
# - Includes error handling and logging
fetch_cocktail = PythonOperator(
    task_id='fetch_cocktail',
    python_callable=_fetch_cocktail,
    provide_context=True,
    retry_delay=timedelta(minutes=2)
)

e. Storing the Processed Data

# Task 5: Stores the processed data in PostgreSQL
# - Reads from temporary CSV file
# - Copies data into PostgreSQL table
# - Cleans up temporary file after completion
# - Includes error handling and logging
store_cocktail = PythonOperator(
    task_id='store_cocktail',
    python_callable=_store_cocktail,
    retry_delay=timedelta(minutes=2)
)

f. Establishing Task Dependencies

# Define task dependencies (execution order)
# create_table -> poll_api -> extract_cocktail -> fetch_cocktail -> store_cocktail
create_table >> poll_api >> extract_cocktail >> fetch_cocktail >> store_cocktail

This chain guarantees that each step completes before the next begins.

Note: Before running the DAG, ensure that the following Airflow connections are properly configured:

  1. Connection ID: cocktail_api
  2. Connection ID: postgres
    • Type: Postgres
    • Host: Postgres (within the Docker container)
    • Login: airflow
    • Port: 5432

You can configure these connections in the Airflow UI under Admin > Connections.

Testing Your DAG Locally

Before deploying your DAG, it’s crucial to test it locally to ensure that all tasks execute correctly and dependencies are properly configured. Apache Airflow provides several ways to test and debug DAGs before running them in production.

Validate the DAG Syntax

Before running the DAG, check for syntax errors and misconfigurations by listing the available DAGs:

# List running containers from the compose file
docker-compose ps

# Execute command inside the webserver container
docker exec -it airflow-airflow-webserver-1 /bin/bash
airflow dags list

# If your DAG appears in the list, it means Airflow successfully parsed it.
# To further inspect the DAG's structure, use:
airflow dags show cocktail_api_dag

Trigger Individual Tasks for Testing

You can test individual tasks separately to debug them before running the entire DAG. The following command allows you to run a specific task and view its output:

# airflow tasks test <dag_id> <task_id> <execution_date>
# For example, to test the create_table task:
airflow tasks test cocktail_api_dag create_table 2025-01-01

#Similarly, test other tasks:

airflow tasks test cocktail_api_dag poll_api 2025-01-01
airflow tasks test cocktail_api_dag extract_cocktail 2025-01-01
airflow tasks test cocktail_api_dag fetch_cocktail 2025-01-01
airflow tasks test cocktail_api_dag store_cocktail 2025-01-01

After running all these tasks and validating they worked, run a query against the postgres DB and see if your table exists and have stored some cocktail data

docker exec -it airflow-postgres-1 /bin/bash
psql -Uairflow
select * from information_schema.tables where table_name = 'cocktail';
select * from cocktail;

Run the Entire DAG Manually

To simulate an actual DAG execution, trigger it manually using:

airflow dags trigger cocktail_api_dag
airflow dags list-runs -d cocktail_api_dag

Check the Task Logs in the Airflow UI

For a more interactive debugging experience, open the Airflow web UI (http://localhost:8080) and:

  1. Navigate to DAGs and select cocktail_api_dag.
  2. Click on Graph View or Tree View to visualize task dependencies.
  3. Click on an individual task and go to the Logs tab to inspect execution details.

Disclaimer

It’s important to note that Apache Airflow is not a data processing framework but an orchestration tool designed to manage and schedule workflows. In this DAG, we performed some data processing within an Airflow task to create a complete, real-world example for learning purposes. However, in a production environment, data processing is typically handled by specialized frameworks such as Apache Spark outside of Airflow. Airflow’s primary role is to coordinate tasks and manage dependencies, not to perform heavy data transformations. This example was intentionally simplified to focus on DAG structure without introducing too many external components.

Conclusion

This detailed example demonstrates how to build a robust, real-world DAG in Apache Airflow. We started by creating helper functions for data processing and storage, then moved on to define our tasks using a mix of PostgreSQL operators, HTTP sensors/operators, and Python operators. By setting clear task dependencies, the DAG ensures a smooth, sequential execution of the workflow – from setting up the database table to ingesting data from an API and finally storing processed results.

With this example as a foundation, you can further customize your DAGs to suit various data workflows. Happy DAG-building and orchestrating your data pipelines!

The Final DAG file

"""
DAG to fetch and store cocktail data from CocktailDB API
"""
# Core Airflow imports
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.exceptions import AirflowException

# Standard library imports
from datetime import datetime, timedelta
from typing import Any
import json
import logging

# Third party imports
from pandas import json_normalize

logger = logging.getLogger(__name__)

def _fetch_cocktail(ti: Any) -> None:
    """
    Process cocktail data from the API response
    
    Args:
        ti: Airflow's task instance object for accessing XCom data
        
    Flow:
        1. Pulls data from XCom (shared by extract_cocktail task)
        2. Validates the data structure and required fields
        3. Transforms data into CSV format
        4. Saves to a temporary file for database loading
        
    Raises:
        ValueError: When data is missing or invalid
        AirflowException: For any processing failures
    """
    try:
        # Get data from previous task via XCom
        cocktail_data = ti.xcom_pull(task_ids="extract_cocktail")
        if not cocktail_data or 'drinks' not in cocktail_data:
            raise ValueError("No cocktail data received from API")
            
        # Extract first drink from results
        cocktail = cocktail_data['drinks'][0]
        logger.info(f"Processing cocktail: {cocktail.get('strDrink', 'Unknown')}")
        
        # Define and validate required fields
        required_fields = ['idDrink', 'strDrink', 'strCategory', 'strAlcoholic', 'strGlass']
        if not all(field in cocktail for field in required_fields):
            missing_fields = [f for f in required_fields if f not in cocktail]
            raise ValueError(f"Missing required fields: {missing_fields}")
        
        # Transform data to normalized format and save to CSV
        result = json_normalize({field: cocktail[field] for field in required_fields})
        result.to_csv('/tmp/fetch_cocktail.csv', index=None, header=False)
        logger.info(f"Successfully processed cocktail: {cocktail['strDrink']}")
        
    except Exception as e:
        logger.error(f"Error processing cocktail: {str(e)}")
        raise AirflowException(f"Cocktail processing failed: {str(e)}")

def _store_cocktail() -> None:
    """
    Store processed cocktail data in PostgreSQL database
    
    Flow:
        1. Connects to PostgreSQL using Airflow's hook
        2. Copies data from temporary CSV file to database
        3. Cleans up temporary file
        
    Note:
        - Uses COPY command for efficient data loading
        - Ensures cleanup of temporary files even if operation fails
        
    Raises:
        AirflowException: When database operations fail
    """
    try:
        # Initialize PostgreSQL connection
        hook = PostgresHook(postgres_conn_id='postgres')
        
        # Copy data from CSV to database using PostgreSQL COPY command
        hook.copy_expert(
            sql="COPY cocktail FROM stdin WITH DELIMITER as ','",
            filename='/tmp/fetch_cocktail.csv'
        )
        logger.info("Successfully stored cocktail in database")
        
    except Exception as e:
        logger.error(f"Database operation failed: {str(e)}")
        raise AirflowException(f"Failed to store cocktail: {str(e)}")
    finally:
        # Clean up temporary file regardless of success/failure
        import os
        if os.path.exists('/tmp/fetch_cocktail.csv'):
            os.remove('/tmp/fetch_cocktail.csv')

# DAG default arguments
# These arguments will be applied to all tasks in the DAG unless overridden
default_args = {
    'owner': 'airflow',                    # Owner of the DAG
    'depends_on_past': False,              # Tasks don't depend on past runs
    'retries': 2,                          # Number of retries if task fails
    'retry_delay': timedelta(minutes=5),   # Delay between retries
    'tags': ["cocktail", "api"],           # Tags for organizing DAGs in UI
    'email_on_failure': True,              # Send email when task fails
}

# DAG definition
# This is the main DAG configuration that defines the workflow
with DAG(
    'cocktail_api_dag',
    default_args=default_args,
    description='Fetches and stores random cocktail data from CocktailDB API',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    max_active_runs=1
) as dag:

    # Task definitions
    # Task 1: Creates the PostgreSQL table if it doesn't exist
    # - Connects to PostgreSQL using Airflow's PostgresOperator
    # - Executes SQL command to create the 'cocktail' table
    # - Ensures the table has unique constraint on 'idDrink'
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres',
        sql='''
            CREATE TABLE IF NOT EXISTS cocktail (
                idDrink INT NOT NULL,
                strDrink TEXT NOT NULL,
                strCategory TEXT NOT NULL,
                strAlcoholic TEXT NOT NULL, 
                strGlass TEXT NOT NULL,
                CONSTRAINT unique_drink UNIQUE (idDrink)
            );
        '''
    )

    # Task 2: Checks if the CocktailDB API is available
    # - Acts as a sensor that polls the API endpoint
    # - Waits up to 5 minutes (300 seconds) for API to respond
    # - Checks every 60 seconds (1 minute)
    # - Uses 'reschedule' mode to free up worker slots while waiting
    poll_api = HttpSensor(
        task_id='poll_api',
        http_conn_id='cocktail_api',
        endpoint='api/json/v1/1/random.php',
        poke_interval=60,
        timeout=300,
        mode='reschedule'
    )

    # Task 3: Fetches a random cocktail from the API
    # - Makes an HTTP GET request to the API
    # - Converts JSON response to Python dictionary
    # - Stores the result in XCom for next task
    # - Logs the API response for debugging
    extract_cocktail = SimpleHttpOperator(
        task_id='extract_cocktail',
        http_conn_id='cocktail_api',
        endpoint='api/json/v1/1/random.php',
        method='GET',
        response_filter=lambda response: json.loads(response.text),
        log_response=True
    )

    # Task 4: Processes the cocktail data
    # - Gets data from previous task using XCom
    # - Validates required fields are present
    # - Transforms data into correct format
    # - Saves to temporary CSV file
    # - Includes error handling and logging
    fetch_cocktail = PythonOperator(
        task_id='fetch_cocktail',
        python_callable=_fetch_cocktail,
        provide_context=True,
        retry_delay=timedelta(minutes=2)
    )

    # Task 5: Stores the processed data in PostgreSQL
    # - Reads from temporary CSV file
    # - Copies data into PostgreSQL table
    # - Cleans up temporary file after completion
    # - Includes error handling and logging
    store_cocktail = PythonOperator(
        task_id='store_cocktail',
        python_callable=_store_cocktail,
        retry_delay=timedelta(minutes=2)
    )

    # Define task dependencies (execution order)
    # create_table -> poll_api -> extract_cocktail -> fetch_cocktail -> store_cocktail
    create_table >> poll_api >> extract_cocktail >> fetch_cocktail >> store_cocktail