Build a complete ETL pipeline using PySpark to process e-commerce data, including sales analysis, customer segmentation, and product performance metrics. Learn how to leverage Spark's distributed processing capabilities for large-scale data transformations.
This example demonstrates how to use PySpark to perform ETL (Extract, Transform, Load) operations on e-commerce data. It showcases:
import os from pyspark.sql import SparkSession from pyspark.sql.functions import col, year, month, dayofmonth, hour, sum, avg, count, when current_dir = os.path.dirname(os.path.abspath(__file__)) data_dir = os.path.join(current_dir, "data") output_dir = os.path.join(current_dir, "output")
This section imports necessary modules and sets up input and output directories.
spark = SparkSession.builder \ .appName("E-commerce Data Processing") \ .master("local[*]") \ .getOrCreate()
Here, we create a Spark session, which is the entry point for PySpark functionality.
sales_df = spark.read.csv(os.path.join(data_dir, "sales_data.csv"), header=True, inferSchema=True) cleaned_sales = sales_df.dropna() \ .filter(col("total_amount") > 0) \ .withColumn("sale_date", col("sale_timestamp").cast("date")) \ .withColumn("sale_year", year("sale_timestamp")) \ .withColumn("sale_month", month("sale_timestamp")) \ .withColumn("sale_day", dayofmonth("sale_timestamp")) \ .withColumn("sale_hour", hour("sale_timestamp"))
This code loads the sales data, removes null values, filters out invalid records, and extracts date components.
daily_sales = cleaned_sales.groupBy("sale_date", "product_category", "sale_year", "sale_month") \ .agg( sum("total_amount").alias("total_sales"), avg("total_amount").alias("avg_sale_value"), count("order_id").alias("num_orders") ) top_products = cleaned_sales.groupBy("product_id", "product_name") \ .agg(sum("quantity").alias("total_quantity_sold")) \ .orderBy(col("total_quantity_sold").desc()) \ .limit(10)
These operations perform various aggregations on the sales data, including daily sales statistics and identifying top-selling products.
product_df = spark.read.parquet(os.path.join(data_dir, "product_data.parquet")) sales_with_product_info = cleaned_sales.join( product_df, on="product_id", how="left" ) sales_with_margins = sales_with_product_info.withColumn( "profit_margin", (col("total_amount") - col("cost_price") * col("quantity")) / col("total_amount") ) customer_segments = sales_with_margins.groupBy("customer_id") \ .agg( sum("total_amount").alias("total_spent"), avg("profit_margin").alias("avg_profit_margin"), count("order_id").alias("num_orders") ) \ .withColumn( "customer_segment", when(col("total_spent") > 1000, "High Value") .when(col("total_spent") > 500, "Medium Value") .otherwise("Low Value") )
This section joins sales data with product data, calculates profit margins, and segments customers based on their spending.
daily_sales.write \ .partitionBy("sale_year", "sale_month") \ .parquet(os.path.join(output_dir, "daily_sales"), mode="overwrite") top_products.write \ .parquet(os.path.join(output_dir, "top_products"), mode="overwrite") customer_segments.write \ .parquet(os.path.join(output_dir, "customer_segments"), mode="overwrite")
Finally, the results are written to Parquet files, with daily sales partitioned by year and month.
To run this example:
Ensure you have PySpark installed:
pip install pyspark
Make sure you have the input data files (sales_data.csv and product_data.parquet) in the data directory.
Run the script:
python pyspark_example.py
The script will process the input data and write the results to the output directory in Parquet format.