Learn to process datasets larger than memory using Dask's parallel computing capabilities. This project demonstrates how to read multiple log files, perform distributed aggregations, and efficiently process big data that would be impossible with standard Pandas.
This example demonstrates how to use Dask for processing large datasets in a data engineering context. It covers:
import dask.dataframe as dd import pandas as pd import numpy as np import os script_dir = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(script_dir, 'logs')
This section imports necessary modules and sets up the directory structure for our log files. It uses os.path.dirname(os.path.abspath(__file__)) to get the absolute path of the directory containing the script. This will allows you to run the script from any location, and it will always create and read files relative to its own location.
def create_sample_logs(num_files, rows_per_file): for i in range(num_files): df = pd.DataFrame({ 'timestamp': pd.date_range(start='2023-01-01', periods=rows_per_file, freq='S'), 'user_id': np.random.randint(1, 1001, rows_per_file), 'action': np.random.choice(['click', 'view', 'purchase'], rows_per_file), 'value': np.random.randint(1, 100, rows_per_file) }) file_path = os.path.join(logs_dir, f'log_file_{i}.csv') df.to_csv(file_path, index=False) os.makedirs(logs_dir, exist_ok=True) create_sample_logs(num_files=3, rows_per_file=10000)
This function creates sample log files to simulate large datasets. It generates CSV files with timestamps, user IDs, actions, and values.
logs_pattern = os.path.join(logs_dir, '*.csv') logs = dd.read_csv(logs_pattern) user_activity = logs.groupby('user_id').agg({ 'action': 'count', 'value': 'sum' }).reset_index() result = user_activity.compute()
Here, Dask reads all CSV files matching the pattern. It then performs a group-by operation to aggregate user activity. The compute() method triggers the actual computation.
output_file = os.path.join(script_dir, 'user_activity_summary.csv') result.to_csv(output_file, index=False)
Finally, the results are saved to a CSV file.
To run this example:
Ensure you have Dask installed:
pip install "dask[complete]"
Save the Python code in a file, e.g., 'dask_example.py'
Run the script:
python dask_example.py
The script will create sample log files, process them using Dask, and generate a 'user_activity_summary.csv' file with the results.
compute() is called.This example showcases how Dask can be used in data engineering tasks to efficiently process large datasets that may not fit into memory.