Build a production-ready data pipeline using Apache Airflow DAGs to process daily orders. Learn to orchestrate complex workflows with multiple operators, implement branching logic, data quality checks, and handle task dependencies in the industry's most widely-adopted orchestration tool.
Apache Airflow: It's the most established and widely adopted tool in the industry. It's particularly strong for complex, large-scale workflows.
This example demonstrates how to create a data processing pipeline using Apache Airflow. It showcases:
import os from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.bash import BashOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.sensors.filesystem import FileSensor from airflow.utils.task_group import TaskGroup from airflow.models import Variable from datetime import datetime, timedelta import pandas as pd BASE_DIR = os.path.dirname(os.path.abspath(__file__))
This section imports necessary Airflow modules and sets up the base directory for file operations.
default_args = { 'owner': 'data_engineer', 'depends_on_past': False, 'email': ['your_email@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'daily_order_processing', default_args=default_args, description='Daily order processing pipeline', schedule_interval='0 1 * * *', start_date=datetime(2023, 1, 1), catchup=False ) as dag: # Tasks defined here
Here, we define the DAG with its configuration, including scheduling and default arguments.
The DAG includes several types of tasks:
Example task:
extract_main = PythonOperator( task_id='extract_from_main', python_callable=extract_order_data, )
Task dependencies are set up at the end of the DAG definition:
start >> check_for_data >> choose_source >> [extract_main, extract_backup] >> transform >> quality_check >> load_group >> end
This defines the order in which tasks should be executed.
Key Features Demonstrated in This Example
Scheduling: The DAG is scheduled to run daily at 1 AM (schedule_interval='0 1 \* \* \*). This shows Airflow's ability to automate recurring workflows.
Task Dependencies: The pipeline has a clear flow of tasks, defined by the last line. This demonstrates Airflow's core feature of managing task dependencies.
Branching: The choose_data_source task uses a BranchPythonOperator to decide whether to extract from the main or backup source. This shows how Airflow can handle conditional workflows.
Error Handling: The default_args dictionary includes retry logic and email notifications for failures, showcasing Airflow's built-in error handling capabilities.
Sensors: The FileSensor is used to check for the existence of the input file before proceeding. This is crucial in real-world scenarios where you might be waiting for upstream data.
Data Quality Checks: The check_data_quality task performs a simple quality check, demonstrating how you can integrate data validation into your pipeline.
Different Operators: The DAG uses various operators (PythonOperator, BashOperator, PostgresOperator) showing Airflow's flexibility in task execution.
Task Groups: The load_data tasks are grouped using a TaskGroup, which helps in organizing complex DAGs with many tasks.
Variables: The decide_data_source function uses an Airflow Variable, showing how you can parameterize your workflows.
Connection Management: The PostgresOperator uses a postgres_conn_id, demonstrating Airflow's ability to manage connections securely.
This example illustrates a realistic data engineering scenario where you're processing daily order data. It includes common elements like extracting data, transforming it, performing quality checks, and loading it into a database. The branching logic to choose between main and backup data sources is particularly relevant in real-world situations where data availability might be inconsistent.
By using Airflow, you get a clear visualization of the workflow, automatic retries, failure notifications, and the ability to easily manage complex dependencies between tasks. This makes it easier to build robust, production-grade data pipelines.
To run this example:
daily_order_processing_dag.py in your Airflow DAGs folder (typically ~/airflow/dags/).The DAG will run daily at 1 AM, processing order data, performing quality checks, and loading data into a PostgreSQL database.
This DAG demonstrates a robust data processing pipeline with error handling, data quality checks, and flexible execution paths.
This setup assumes your directory structure looks like:
/path/to/your/dag/
├── your_dag_file.py
└── data/
├── raw_orders.csv
└── backup_orders.csv
Make sure that the following CSV files are in your BASE_DIR (in our case data):
Install Airflow:
pip install apache-airflow
pip install apache-airflow-providers-postgres pandas
Set up Airflow:
export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create --username admin --firstname Your --lastname Name --role Admin --email your_email@example.com
Create the directory for our test files (if it does not exist already):
mkdir ~/data
If the dags folder still doesn't exist after running airflow db init, you can create it manually:
mkdir -p ~/airflow/dags
Copy the DAG file into your Airflow DAGs folder:
cp your_dag_file.py ~/airflow/dags/
Ensure that Airflow can read your DAG:
airflow dags list
Start the Airflow webserver:
airflow webserver --port 8080
In a new terminal, start the Airflow scheduler:
source <path-to-your-venv-folder>/venv/bin/activateYou should see the name of your virtual environment in your terminal prompt, indicating it's active.
export AIRFLOW_HOME=~/airflowairflow scheduler
Access the Airflow web interface at http://localhost:8080 and log in with the credentials you created. In the Airflow UI, go to Admin > Variables and add a variable:
Key: use_backup_source
Value: false
Set up a Postgres connection in Airflow:
Conn Id: postgres_default
Conn Type: Postgres
Host: localhost (or your Postgres host)
Schema: your_database_name
Login: your_username
Password: your_password
Port: 5432 (default Postgres port)
Make sure you have a local Postgres database running and accessible with the credentials you provided.
Now you should be able to trigger the DAG from the Airflow UI and see it run through all the tasks. The DAG will process the data from raw_orders.csv, transform it, and attempt to load it into your Postgres database.
Remember to adjust file paths, database credentials, and other settings as needed for your specific setup. This example assumes a Unix-like environment (Linux or macOS). If you're on Windows, you may need to adjust some commands and file paths.