Build Pythonic stream processing applications using Faust, a library designed specifically for Python developers. Learn async stream processing, stateful operations, and how to create real-time data pipelines with familiar Python syntax.
This document explains the Faust example provided in faust_example.py.
The example demonstrates how to:
import faust app = faust.App('ecommerce-analytics', broker='kafka://localhost:9092')
This section imports the Faust library and creates a Faust application named 'ecommerce-analytics'. It specifies the Kafka broker address for communication.
class Order(faust.Record): order_id: str product_id: str category: str price: float quantity: int class CategoryRevenue(faust.Record): category: str total_revenue: float
Here, we define two data models using Faust Records:
Order: Represents an individual order with its details.CategoryRevenue: Represents the total revenue for a product category.orders_topic = app.topic('orders', value_type=Order) category_revenue_topic = app.topic('category-revenue', value_type=CategoryRevenue) category_revenue_table = app.Table('category_revenue', default=float)
This code defines:
@app.agent(orders_topic) async def process_order(orders): async for order in orders: revenue = order.price * order.quantity category_revenue_table[order.category] += revenue updated_revenue = CategoryRevenue( category=order.category, total_revenue=category_revenue_table[order.category] ) await category_revenue_topic.send(value=updated_revenue) print(f"Processed order {order.order_id} for category {order.category}")
This function defines a Faust agent that:
CategoryRevenue object with the updated total.if __name__ == '__main__': app.main()
This line runs the Faust application when the script is executed directly.
To run this example:
Ensure you have Faust installed:
pip install faust faust-streaming kafka-python
Make sure you have a Kafka broker running at localhost:9092.
With Kafka running, you can start your Faust worker:
python faust_example.py worker
There are Kafka producer and Kafka consumer apps for testing of this app provided. Check their respective documents to see how they work and how to use them.
The script will start processing orders from the 'orders' topic, updating category revenues, and producing results to the 'category-revenue' topic.
Stream Processing: Demonstrates Faust's capability to process continuous streams of data in real-time.
Data Modeling: Shows how to use Faust Records to define structured data models, ensuring type safety and clear data representation.
Kafka Integration: Illustrates seamless integration with Kafka topics for both consuming and producing events.
Stateful Computations: Demonstrates the use of Faust Tables for maintaining state across events, allowing for aggregations and cumulative calculations.
Asynchronous Processing: Leverages Python's async/await syntax for efficient, non-blocking stream processing.
Event Production: Shows how to produce derived events to a new topic based on processed data.
Real-time Analytics: Implements a simple real-time analytics scenario by calculating and updating category revenues on-the-fly.
This example provides a foundation for building more complex stream processing applications with Faust, demonstrating key concepts of event-driven architectures and real-time data processing.
The Faust example script (faust_example.py) requires a running Kafka broker to function. To simplify the setup and ensure consistency across different systems, we use Docker Compose to run Kafka.
The Docker setup for this example is defined in the docker-compose.yml file, which is explained in detail in the Faust Kafka Docker Compose Explanation document. This Docker Compose configuration sets up a Kafka broker, automatically creates the necessary topics, and provides a web UI for monitoring the Kafka cluster.
To run the Faust example:
First, start the Kafka environment using Docker Compose:
docker-compose up -d
Once Kafka is running, you can execute the Faust script:
faust -A faust_example worker -l info
You can monitor the Kafka cluster and topics through the Kafka UI at http://localhost:8080.
After you're done, stop the Kafka environment:
docker-compose down
By using this Docker setup, you ensure that the Faust example has a properly configured Kafka environment to connect to, regardless of your local system setup. This approach makes it easier to develop, test, and demonstrate Faust applications consistently across different machines.
For more details on the Kafka Docker setup, please refer to the Faust Kafka Docker Compose Explanation document.