Create a modern ETL pipeline with Prefect to extract Pokemon data from the PokeAPI, transform it, and load into SQLite. Perfect for learning Prefect's intuitive task and flow decorators with a fun, beginner-friendly example that demonstrates retry logic and error handling.
This document explains the Prefect example provided in prefect_example.py.
The example demonstrates how to:
import requests import sqlite3 from prefect import task, flow
This section imports the necessary modules: requests for API calls, sqlite3 for database operations, and Prefect's task and flow decorators.
@task(retries=3, retry_delay_seconds=5) def extract_pokemon_data(pokemon_id: int): """Extract data for a single Pokemon from the PokeAPI.""" url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}" response = requests.get(url) response.raise_for_status() return response.json()
This task fetches data from the PokeAPI. It's decorated with @task and includes retry logic for robustness.
@task def transform_pokemon_data(pokemon_data: dict): """Transform the raw Pokemon data into a simplified format.""" return { "id": pokemon_data["id"], "name": pokemon_data["name"], "height": pokemon_data["height"], "weight": pokemon_data["weight"], "base_experience": pokemon_data["base_experience"], }
This task transforms the raw API data into a simplified format. It's a pure function, demonstrating how Prefect can handle data processing.
@task def load_pokemon_data(pokemon_data: dict): """Load the transformed Pokemon data into a SQLite database.""" conn = sqlite3.connect("pokemon.db") cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS pokemon (id INTEGER PRIMARY KEY, name TEXT, height INTEGER, weight INTEGER, base_experience INTEGER) ''') cursor.execute(''' INSERT OR REPLACE INTO pokemon (id, name, height, weight, base_experience) VALUES (:id, :name, :height, :weight, :base_experience) ''', pokemon_data) conn.commit() conn.close()
This task handles database operations, creating a table if it doesn't exist and inserting or updating Pokemon data.
@flow def pokemon_etl_flow(start_id: int = 1, end_id: int = 10): """Main ETL flow to process Pokemon data.""" for pokemon_id in range(start_id, end_id + 1): raw_data = extract_pokemon_data(pokemon_id) transformed_data = transform_pokemon_data(raw_data) load_pokemon_data(transformed_data) if __name__ == "__main__": pokemon_etl_flow()
The flow ties all tasks together, creating a complete ETL pipeline. It processes Pokemon data for a range of IDs, demonstrating how Prefect can handle iterative workflows.
To run this example:
pip install --upgrade prefect requests
python prefect_example.py
This will execute the ETL flow, processing Pokemon data and storing it in a SQLite database named pokemon.db.
prefect server startprefect config set PREFECT_API_URL=http://127.0.0.1:4200/apipython pokemon_etl.pyhttp://localhost:4200 to see the Prefect UILet's break down the key features and their significance for this updated version of the Prefect example:
Task Decoration: We use the @task decorator to define individual tasks. This allows Prefect to manage and orchestrate these tasks.
Significance: This enables Prefect to handle each function as a separate unit of work, making it easier to monitor, retry, and manage dependencies between tasks.
Retries: The extract_pokemon_data task has a retry mechanism (retries=3, retry_delay_seconds=5).
Significance: This is crucial for handling transient failures, like network issues, which are common in real-world scenarios. It improves the robustness of the data pipeline.
Flow Definition: The @flow decorator defines our main workflow. It orchestrates the execution of our tasks.
Significance: This allows Prefect to manage the entire workflow, handling task dependencies, parallelization, and overall execution flow.
Parameter Passing: The flow accepts parameters (start_id and end_id) which allows for flexible execution.
Significance: This makes the flow reusable and configurable, allowing users to process different ranges of Pokemon without changing the code.
Error Handling: We use response.raise_for_status() to handle HTTP errors, which Prefect can then manage.
Significance: This ensures that any issues with the API call are caught and can be handled appropriately by Prefect's error handling mechanisms.
Database Operations: The load_pokemon_data task demonstrates how to interact with a database within a Prefect task.
Significance: This shows how Prefect can be used to orchestrate not just data processing, but also data storage operations, which is a common requirement in ETL pipelines.
Modular Design: The ETL process is broken down into separate tasks for extraction, transformation, and loading.
Significance: This modular approach makes the code more maintainable and allows for easier debugging and optimization of individual steps in the pipeline.
Automatic Logging and Monitoring: While not explicitly coded, Prefect automatically provides logging and monitoring for all tasks and flows.
Significance: This gives visibility into the execution of the pipeline, making it easier to track progress and diagnose issues.
Scalability: The flow is designed to handle multiple Pokemon entries in a loop.
Significance: This structure allows for easy scaling of the pipeline to handle larger datasets by simply adjusting the start_id and end_id parameters.
Integration with External Services: The example demonstrates integration with an external API (PokeAPI) and a local database (SQLite).
Significance: This showcases Prefect's ability to orchestrate workflows that involve multiple data sources and destinations, a common requirement in data engineering.
These features demonstrate how Prefect can be used to create robust, scalable, and maintainable data pipelines. It handles common data engineering tasks while providing built-in features for error handling, retries, and workflow management, making it easier to build reliable ETL processes.