Apache Airflow is a powerful platform for orchestrating complex workflows. After learning the Fundamentals and installing Airflow with Docker, it’s time to dive into one of its most essential features – the Directed Acyclic Graph (DAG). In this article, we’ll explore what a DAG is, break down its structure, and build a complete example DAG that demonstrates an end-to-end ETL (Extract, Transform, Load) process.
The Anatomy of a DAG
Let’s break down the components that make up a DAG in Apache Airflow:
- Default Arguments (default_args):
These are key-value pairs that define common parameters for tasks. They include settings such as the owner, retry behavior, start date, and email notifications. Centralizing these settings ensures consistency across all tasks in the DAG. - DAG Object:
The DAG object is defined by its ID, a description, the default arguments, and the schedule interval. It acts as a container for all the tasks. - Operators and Tasks:
Tasks are the individual units of work. Operators (e.g., PythonOperator, BashOperator) encapsulate the logic to execute a particular task. Each task is associated with a unique task ID. - Task Dependencies:
You establish the order in which tasks run by setting dependencies. This can be done using bitshift operators (e.g., task1 >> task2) or methods like set_downstream() and set_upstream(). - Scheduling:
The schedule_interval defines how often your DAG runs. It can be set to a cron expression, a timedelta, or even None if you prefer manual triggering.
Step 1: Set Up Your Environment
Before you begin, ensure Apache Airflow is installed and running. If you’re using Docker, you likely launched Airflow via a docker-compose.yaml file, which starts the scheduler, webserver, and other core components. To add your workflows, place DAG files in the mounted dags/ directory – this allows Airflow to automatically detect and execute them.
If you’re new to Airflow or unsure how to structure your project, the Astro CLI simplifies setup. This open-source tool initializes a fully configured Airflow environment with one command:
astro dev init --from-template
This generates all necessary files and even includes an example DAG to help you start experimenting immediately. Read Create an Astra project for more details on the project structure.
Step 2: Create Your DAG File
Create a new Python file (cocktail_apil_dag.py) in your dags/ folder. This file will contain all the logic for your DAG.
Step 3: Import Required Modules
At the beginning of the file, we import the necessary modules. These include core Airflow classes, operators for interacting with PostgreSQL and HTTP endpoints, as well as helper libraries like json, pandas for data normalization, and datetime for scheduling.
# Core Airflow imports from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.exceptions import AirflowException # Standard library imports from datetime import datetime, timedelta from typing import Any import json import logging # Third party imports from pandas import json_normalize
- DAG: The central object that orchestrates the workflow.
- Operators and Sensors: Specialized classes to perform tasks such as running SQL commands, making HTTP calls, and executing Python functions.
- PostgresHook: Provides an interface to interact with a PostgreSQL database.
- json_normalize: Used to convert JSON data into a flat table, ideal for processing API responses.
Step 4: Defining Helper Functions
Two helper functions are defined outside the DAG context. These functions will later be used by PythonOperators to process and store the data.
a. Data Processing Function
def _fetch_cocktail(ti: Any) -> None: """ Process cocktail data from the API response Args: ti: Airflow's task instance object for accessing XCom data Flow: 1. Pulls data from XCom (shared by extract_cocktail task) 2. Validates the data structure and required fields 3. Transforms data into CSV format 4. Saves to a temporary file for database loading Raises: ValueError: When data is missing or invalid AirflowException: For any processing failures """ try: # Get data from previous task via XCom cocktail_data = ti.xcom_pull(task_ids="extract_cocktail") if not cocktail_data or 'drinks' not in cocktail_data: raise ValueError("No cocktail data received from API") # Extract first drink from results cocktail = cocktail_data['drinks'][0] logger.info(f"Processing cocktail: {cocktail.get('strDrink', 'Unknown')}") # Define and validate required fields required_fields = ['idDrink', 'strDrink', 'strCategory', 'strAlcoholic', 'strGlass'] if not all(field in cocktail for field in required_fields): missing_fields = [f for f in required_fields if f not in cocktail] raise ValueError(f"Missing required fields: {missing_fields}") # Transform data to normalized format and save to CSV result = json_normalize({field: cocktail[field] for field in required_fields}) result.to_csv('/tmp/fetch_cocktail.csv', index=None, header=False) logger.info(f"Successfully processed cocktail: {cocktail['strDrink']}") except Exception as e: logger.error(f"Error processing cocktail: {str(e)}") raise AirflowException(f"Cocktail processing failed: {str(e)}")
b. Data Storage Function
def _store_cocktail() -> None: """ Store processed cocktail data in PostgreSQL database Flow: 1. Connects to PostgreSQL using Airflow's hook 2. Copies data from temporary CSV file to database 3. Cleans up temporary file Note: - Uses COPY command for efficient data loading - Ensures cleanup of temporary files even if operation fails Raises: AirflowException: When database operations fail """ try: # Initialize PostgreSQL connection hook = PostgresHook(postgres_conn_id='postgres') # Copy data from CSV to database using PostgreSQL COPY command hook.copy_expert( sql="COPY cocktail FROM stdin WITH DELIMITER as ','", filename='/tmp/fetch_cocktail.csv' ) logger.info("Successfully stored cocktail in database") except Exception as e: logger.error(f"Database operation failed: {str(e)}") raise AirflowException(f"Failed to store cocktail: {str(e)}") finally: # Clean up temporary file regardless of success/failure import os if os.path.exists('/tmp/fetch_cocktail.csv'): os.remove('/tmp/fetch_cocktail.csv')
Step 5: Define Default Arguments and Initialize the DAG
Define the default_args dictionary to include common settings for all tasks. This helps manage parameters such as the owner, start date, retries, and retry delay.
# DAG default arguments # These arguments will be applied to all tasks in the DAG unless overridden default_args = { 'owner': 'airflow', # Owner of the DAG 'depends_on_past': False, # Tasks don't depend on past runs 'retries': 2, # Number of retries if task fails 'retry_delay': timedelta(minutes=5), # Delay between retries 'tags': ["cocktail", "api"], # Tags for organizing DAGs in UI 'email_on_failure': True, # Send email when task fails } # DAG definition # This is the main DAG configuration that defines the workflow with DAG( 'cocktail_api_dag', default_args=default_args, description='Fetches and stores random cocktail data from CocktailDB API', start_date=datetime(2025, 1, 1), schedule_interval='@daily', catchup=False, max_active_runs=1 ) as dag:
6. Creating the DAG and Its Tasks
The DAG is defined using a context manager (with DAG(...) as dag:
), which ensures that all tasks declared within the block are automatically associated with this DAG.
a. Creating the PostgreSQL Table
# Task definitions # Task 1: Creates the PostgreSQL table if it doesn't exist # - Connects to PostgreSQL using Airflow's PostgresOperator # - Executes SQL command to create the 'cocktail' table # - Ensures the table has unique constraint on 'idDrink' create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=''' CREATE TABLE IF NOT EXISTS cocktail ( idDrink INT NOT NULL, strDrink TEXT NOT NULL, strCategory TEXT NOT NULL, strAlcoholic TEXT NOT NULL, strGlass TEXT NOT NULL, CONSTRAINT unique_drink UNIQUE (idDrink) ); ''' )
b. Checking API Availability
# Task 2: Checks if the CocktailDB API is available # - Acts as a sensor that polls the API endpoint # - Waits up to 5 minutes (300 seconds) for API to respond # - Checks every 60 seconds (1 minute) # - Uses 'reschedule' mode to free up worker slots while waiting poll_api = HttpSensor( task_id='poll_api', http_conn_id='cocktail_api', endpoint='api/json/v1/1/random.php', poke_interval=60, timeout=300, mode='reschedule' )
c. Extracting Data from the API
# Task 3: Fetches a random cocktail from the AP # - Makes an HTTP GET request to the API # - Converts JSON response to Python dictionary # - Stores the result in XCom for next task # - Logs the API response for debugging extract_cocktail = SimpleHttpOperator( task_id='extract_cocktail', http_conn_id='cocktail_api', endpoint='api/json/v1/1/random.php', method='GET', response_filter=lambda response: json.loads(response.text), log_response=True )
d. Processing the Extracted Data
# Task 4: Processes the cocktail data # - Gets data from previous task using XCom # - Validates required fields are present # - Transforms data into correct format # - Saves to temporary CSV file # - Includes error handling and logging fetch_cocktail = PythonOperator( task_id='fetch_cocktail', python_callable=_fetch_cocktail, provide_context=True, retry_delay=timedelta(minutes=2) )
e. Storing the Processed Data
# Task 5: Stores the processed data in PostgreSQL # - Reads from temporary CSV file # - Copies data into PostgreSQL table # - Cleans up temporary file after completion # - Includes error handling and logging store_cocktail = PythonOperator( task_id='store_cocktail', python_callable=_store_cocktail, retry_delay=timedelta(minutes=2) )
f. Establishing Task Dependencies
# Define task dependencies (execution order) # create_table -> poll_api -> extract_cocktail -> fetch_cocktail -> store_cocktail create_table >> poll_api >> extract_cocktail >> fetch_cocktail >> store_cocktail
This chain guarantees that each step completes before the next begins.
Note: Before running the DAG, ensure that the following Airflow connections are properly configured:
- Connection ID: cocktail_api
- Type: HTTP
- Host: https://www.thecocktaildb.com/
- Connection ID: postgres
- Type: Postgres
- Host: Postgres (within the Docker container)
- Login: airflow
- Port: 5432
You can configure these connections in the Airflow UI under Admin > Connections.
Testing Your DAG Locally
Before deploying your DAG, it’s crucial to test it locally to ensure that all tasks execute correctly and dependencies are properly configured. Apache Airflow provides several ways to test and debug DAGs before running them in production.
Validate the DAG Syntax
Before running the DAG, check for syntax errors and misconfigurations by listing the available DAGs:
# List running containers from the compose file docker-compose ps # Execute command inside the webserver container docker exec -it airflow-airflow-webserver-1 /bin/bash airflow dags list # If your DAG appears in the list, it means Airflow successfully parsed it. # To further inspect the DAG's structure, use: airflow dags show cocktail_api_dag
Trigger Individual Tasks for Testing
You can test individual tasks separately to debug them before running the entire DAG. The following command allows you to run a specific task and view its output:
# airflow tasks test <dag_id> <task_id> <execution_date> # For example, to test the create_table task: airflow tasks test cocktail_api_dag create_table 2025-01-01 #Similarly, test other tasks: airflow tasks test cocktail_api_dag poll_api 2025-01-01 airflow tasks test cocktail_api_dag extract_cocktail 2025-01-01 airflow tasks test cocktail_api_dag fetch_cocktail 2025-01-01 airflow tasks test cocktail_api_dag store_cocktail 2025-01-01
After running all these tasks and validating they worked, run a query against the postgres DB and see if your table exists and have stored some cocktail data
docker exec -it airflow-postgres-1 /bin/bash psql -Uairflow select * from information_schema.tables where table_name = 'cocktail'; select * from cocktail;
Run the Entire DAG Manually
To simulate an actual DAG execution, trigger it manually using:
airflow dags trigger cocktail_api_dag airflow dags list-runs -d cocktail_api_dag
Check the Task Logs in the Airflow UI
For a more interactive debugging experience, open the Airflow web UI (http://localhost:8080) and:
- Navigate to DAGs and select cocktail_api_dag.
- Click on Graph View or Tree View to visualize task dependencies.
- Click on an individual task and go to the Logs tab to inspect execution details.
Disclaimer
It’s important to note that Apache Airflow is not a data processing framework but an orchestration tool designed to manage and schedule workflows. In this DAG, we performed some data processing within an Airflow task to create a complete, real-world example for learning purposes. However, in a production environment, data processing is typically handled by specialized frameworks such as Apache Spark outside of Airflow. Airflow’s primary role is to coordinate tasks and manage dependencies, not to perform heavy data transformations. This example was intentionally simplified to focus on DAG structure without introducing too many external components.
Conclusion
This detailed example demonstrates how to build a robust, real-world DAG in Apache Airflow. We started by creating helper functions for data processing and storage, then moved on to define our tasks using a mix of PostgreSQL operators, HTTP sensors/operators, and Python operators. By setting clear task dependencies, the DAG ensures a smooth, sequential execution of the workflow – from setting up the database table to ingesting data from an API and finally storing processed results.
With this example as a foundation, you can further customize your DAGs to suit various data workflows. Happy DAG-building and orchestrating your data pipelines!
The Final DAG file
""" DAG to fetch and store cocktail data from CocktailDB API """ # Core Airflow imports from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.exceptions import AirflowException # Standard library imports from datetime import datetime, timedelta from typing import Any import json import logging # Third party imports from pandas import json_normalize logger = logging.getLogger(__name__) def _fetch_cocktail(ti: Any) -> None: """ Process cocktail data from the API response Args: ti: Airflow's task instance object for accessing XCom data Flow: 1. Pulls data from XCom (shared by extract_cocktail task) 2. Validates the data structure and required fields 3. Transforms data into CSV format 4. Saves to a temporary file for database loading Raises: ValueError: When data is missing or invalid AirflowException: For any processing failures """ try: # Get data from previous task via XCom cocktail_data = ti.xcom_pull(task_ids="extract_cocktail") if not cocktail_data or 'drinks' not in cocktail_data: raise ValueError("No cocktail data received from API") # Extract first drink from results cocktail = cocktail_data['drinks'][0] logger.info(f"Processing cocktail: {cocktail.get('strDrink', 'Unknown')}") # Define and validate required fields required_fields = ['idDrink', 'strDrink', 'strCategory', 'strAlcoholic', 'strGlass'] if not all(field in cocktail for field in required_fields): missing_fields = [f for f in required_fields if f not in cocktail] raise ValueError(f"Missing required fields: {missing_fields}") # Transform data to normalized format and save to CSV result = json_normalize({field: cocktail[field] for field in required_fields}) result.to_csv('/tmp/fetch_cocktail.csv', index=None, header=False) logger.info(f"Successfully processed cocktail: {cocktail['strDrink']}") except Exception as e: logger.error(f"Error processing cocktail: {str(e)}") raise AirflowException(f"Cocktail processing failed: {str(e)}") def _store_cocktail() -> None: """ Store processed cocktail data in PostgreSQL database Flow: 1. Connects to PostgreSQL using Airflow's hook 2. Copies data from temporary CSV file to database 3. Cleans up temporary file Note: - Uses COPY command for efficient data loading - Ensures cleanup of temporary files even if operation fails Raises: AirflowException: When database operations fail """ try: # Initialize PostgreSQL connection hook = PostgresHook(postgres_conn_id='postgres') # Copy data from CSV to database using PostgreSQL COPY command hook.copy_expert( sql="COPY cocktail FROM stdin WITH DELIMITER as ','", filename='/tmp/fetch_cocktail.csv' ) logger.info("Successfully stored cocktail in database") except Exception as e: logger.error(f"Database operation failed: {str(e)}") raise AirflowException(f"Failed to store cocktail: {str(e)}") finally: # Clean up temporary file regardless of success/failure import os if os.path.exists('/tmp/fetch_cocktail.csv'): os.remove('/tmp/fetch_cocktail.csv') # DAG default arguments # These arguments will be applied to all tasks in the DAG unless overridden default_args = { 'owner': 'airflow', # Owner of the DAG 'depends_on_past': False, # Tasks don't depend on past runs 'retries': 2, # Number of retries if task fails 'retry_delay': timedelta(minutes=5), # Delay between retries 'tags': ["cocktail", "api"], # Tags for organizing DAGs in UI 'email_on_failure': True, # Send email when task fails } # DAG definition # This is the main DAG configuration that defines the workflow with DAG( 'cocktail_api_dag', default_args=default_args, description='Fetches and stores random cocktail data from CocktailDB API', start_date=datetime(2025, 1, 1), schedule_interval='@daily', catchup=False, max_active_runs=1 ) as dag: # Task definitions # Task 1: Creates the PostgreSQL table if it doesn't exist # - Connects to PostgreSQL using Airflow's PostgresOperator # - Executes SQL command to create the 'cocktail' table # - Ensures the table has unique constraint on 'idDrink' create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=''' CREATE TABLE IF NOT EXISTS cocktail ( idDrink INT NOT NULL, strDrink TEXT NOT NULL, strCategory TEXT NOT NULL, strAlcoholic TEXT NOT NULL, strGlass TEXT NOT NULL, CONSTRAINT unique_drink UNIQUE (idDrink) ); ''' ) # Task 2: Checks if the CocktailDB API is available # - Acts as a sensor that polls the API endpoint # - Waits up to 5 minutes (300 seconds) for API to respond # - Checks every 60 seconds (1 minute) # - Uses 'reschedule' mode to free up worker slots while waiting poll_api = HttpSensor( task_id='poll_api', http_conn_id='cocktail_api', endpoint='api/json/v1/1/random.php', poke_interval=60, timeout=300, mode='reschedule' ) # Task 3: Fetches a random cocktail from the API # - Makes an HTTP GET request to the API # - Converts JSON response to Python dictionary # - Stores the result in XCom for next task # - Logs the API response for debugging extract_cocktail = SimpleHttpOperator( task_id='extract_cocktail', http_conn_id='cocktail_api', endpoint='api/json/v1/1/random.php', method='GET', response_filter=lambda response: json.loads(response.text), log_response=True ) # Task 4: Processes the cocktail data # - Gets data from previous task using XCom # - Validates required fields are present # - Transforms data into correct format # - Saves to temporary CSV file # - Includes error handling and logging fetch_cocktail = PythonOperator( task_id='fetch_cocktail', python_callable=_fetch_cocktail, provide_context=True, retry_delay=timedelta(minutes=2) ) # Task 5: Stores the processed data in PostgreSQL # - Reads from temporary CSV file # - Copies data into PostgreSQL table # - Cleans up temporary file after completion # - Includes error handling and logging store_cocktail = PythonOperator( task_id='store_cocktail', python_callable=_store_cocktail, retry_delay=timedelta(minutes=2) ) # Define task dependencies (execution order) # create_table -> poll_api -> extract_cocktail -> fetch_cocktail -> store_cocktail create_table >> poll_api >> extract_cocktail >> fetch_cocktail >> store_cocktail