tutorialpipelinehands-on
Building Your First Data Pipeline with Python
Data Engineering Team•4/1/2024
Introduction
In this tutorial, you'll build a complete data pipeline that extracts data from a CSV file, transforms it, and loads it into a PostgreSQL database. This foundational knowledge applies to more complex pipelines you'll build in production.
What You'll Build
A pipeline that:
- Extracts sales data from CSV files
- Cleans and validates the data
- Calculates key metrics
- Loads results into PostgreSQL
- Runs on a schedule
Prerequisites
- Python 3.8+
- PostgreSQL installed
- Basic SQL knowledge
- Familiarity with Pandas
Step 1: Extract Data
import pandas as pd from pathlib import Path def extract_sales_data(file_path: str) -> pd.DataFrame: """Extract sales data from CSV file.""" df = pd.read_csv(file_path) print(f"Extracted {len(df)} records") return df
Step 2: Transform Data
def transform_sales_data(df: pd.DataFrame) -> pd.DataFrame: """Clean and transform sales data.""" # Convert date column df['date'] = pd.to_datetime(df['date']) # Remove duplicates df = df.drop_duplicates(subset=['order_id']) # Calculate total revenue df['revenue'] = df['quantity'] * df['price'] # Add month column for aggregation df['month'] = df['date'].dt.to_period('M') return df
Step 3: Validate Data
from pydantic import BaseModel, validator from typing import List class SalesRecord(BaseModel): order_id: str date: str product: str quantity: int price: float @validator('quantity') def quantity_positive(cls, v): if v <= 0: raise ValueError('Quantity must be positive') return v def validate_data(df: pd.DataFrame) -> bool: """Validate data quality.""" try: records = df.to_dict('records') [SalesRecord(**record) for record in records] return True except Exception as e: print(f"Validation failed: {e}") return False
Step 4: Load to Database
from sqlalchemy import create_engine def load_to_postgres(df: pd.DataFrame, table_name: str): """Load data to PostgreSQL.""" engine = create_engine( 'postgresql://user:password@localhost:5432/sales_db' ) df.to_sql( table_name, engine, if_exists='append', index=False ) print(f"Loaded {len(df)} records to {table_name}")
Step 5: Orchestrate with Airflow
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'data-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( 'sales_etl_pipeline', default_args=default_args, schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag: extract_task = PythonOperator( task_id='extract', python_callable=extract_sales_data, ) transform_task = PythonOperator( task_id='transform', python_callable=transform_sales_data, ) load_task = PythonOperator( task_id='load', python_callable=load_to_postgres, ) extract_task >> transform_task >> load_task
Best Practices
- Idempotency: Ensure pipelines can run multiple times safely
- Data validation: Always validate before loading
- Error handling: Log errors and implement retries
- Monitoring: Track pipeline metrics and failures
- Testing: Write unit tests for transformation logic
Next Steps
- Add data quality checks with Great Expectations
- Implement incremental loading for efficiency
- Add alerting for pipeline failures
- Scale with PySpark for larger datasets
Learn More
Check out our hands-on projects for complete working examples of data pipelines, including: