Build a data pipeline with Dagster to fetch stock data from Yahoo Finance, calculate moving averages, and store results in a database. Learn Dagster's functional approach with ops, jobs, and schedules while working with real financial data.
This document explains the Dagster example provided in stock_analysis.py.
The example demonstrates how to:
import yfinance as yf import pandas as pd import sqlite3 from dagster import op, job, schedule, Definitions, Field, String, Int, In, Out from datetime import datetime, timedelta
This section imports necessary modules: yfinance for fetching stock data, pandas for data manipulation, sqlite3 for database operations, and various Dagster components for pipeline construction.
@op( config_schema={ "symbol": Field(String, description="Stock symbol to fetch"), "days": Field(Int, default_value=30, description="Number of days of historical data") }, out=Out(pd.DataFrame) ) def fetch_stock_data(context): # ... (implementation details)
This op fetches stock data from Yahoo Finance. It uses a config schema to allow customization of the stock symbol and date range.
@op(ins={'df': In(pd.DataFrame)}, out=Out(pd.DataFrame)) def calculate_moving_averages(context, df): # ... (implementation details)
This op calculates 5-day and 20-day moving averages for the stock data.
@op(ins={'df': In(pd.DataFrame)}) def store_in_database(context, df): # ... (implementation details)
This op stores the processed data in a SQLite database.
@job def stock_analysis_job(): df = fetch_stock_data() df_with_ma = calculate_moving_averages(df) store_in_database(df_with_ma)
The job combines the ops into a complete workflow, defining the structure of the data pipeline.
@schedule(cron_schedule="0 18 * * 1-5", job=stock_analysis_job) def stock_analysis_schedule(context): return {"ops": {"fetch_stock_data": {"config": {"symbol": "AAPL", "days": 30}}}}
This schedule sets up the job to run at 6 PM on weekdays, demonstrating Dagster's ability to handle recurring workflows.
defs = Definitions( jobs=[stock_analysis_job], schedules=[stock_analysis_schedule] )
The Definitions object bundles jobs and schedules, making them accessible to Dagster tools and UIs.
To run this example:
Ensure you have Dagster and required libraries installed:
pip install dagster dagster-webserver pandas yfinance
Save the script as stock_analysis.py
Run the Dagster UI:
dagster dev -f stock_analysis.py
Open a web browser and go to http://localhost:3000
Navigate to the "Playground" tab, select stock_analysis_job, and click "Launch Run"
In the configuration section, you can specify the stock symbol and number of days. For example:
ops: fetch_stock_data: config: symbol: "MSFT" days: 60
Click "Launch Run" to execute the job.
You can also see the schedule in the "Schedules" tab, though you'd need to set up a Dagster Daemon to run scheduled jobs.
This example showcases how Dagster simplifies the creation and management of data workflows, providing built-in features for orchestration, configuration, scheduling, and execution management.
Here's a breakdown of the key features for our Dagster example:
Configuration: We use the config_schema parameter in the fetch_stock_data op to define configurable options (stock symbol and number of days). This allows for flexible job execution without changing the code.
Ops: Each function (fetch_stock_data, calculate_moving_averages, store_in_database) is decorated with @op, representing a discrete unit of work in our data pipeline.
Input/Output Typing: We use In and Out to specify the types of inputs and outputs for each op. This improves type safety and helps with data flow visualization in the Dagster UI.
Job: The stock_analysis_job combines these ops into a cohesive workflow, defining the structure of our data pipeline.
Scheduling: The stock_analysis_schedule sets up the job to run automatically at 6 PM on weekdays, demonstrating Dagster's ability to handle recurring workflows.
Logging: We use context.log.info() to log important information at each step, leveraging Dagster's built-in logging capabilities.
Database Integration: The store_in_database op demonstrates how to integrate database operations into your Dagster pipeline, storing results in a SQLite database.
Error Handling: While not explicitly shown, Dagster provides automatic error handling and retries for ops.
Dagster Definitions: The Definitions object bundles jobs and schedules together, making it easy to load them into Dagster tools and UIs.
Modular Design: Each op is a self-contained unit, promoting code reusability and easier testing.
These features showcase how Dagster can handle real-world data engineering tasks with a focus on maintainability, observability, and reproducibility. The framework provides a structured way to build complex data pipelines while offering flexibility in configuration and execution.
hello_dagster.py file in this same folder is a minimal Dagster example, to ensure if Dagster is working correctly on our system.
stock_analysis_simple.py file is a simplified stock analysys example, which can be used to showcase minimal set of features of Dagster.