Build event-driven data pipelines using Apache Kafka with the confluent-kafka Python client. Learn to produce and consume messages, handle topics, and create the foundation for real-time data streaming architectures.
This document explains the Kafka example provided in kafka_example.py.
The example demonstrates how to:
import json import random from datetime import datetime from confluent_kafka import Producer, Consumer, KafkaError from confluent_kafka.admin import AdminClient, NewTopic kafka_config = { 'bootstrap.servers': 'localhost:9092', 'client.id': 'ecommerce-data-processor' }
This section imports necessary modules and sets up the Kafka configuration. The kafka_config dictionary specifies the Kafka broker address and a client ID.
admin_client = AdminClient(kafka_config) topic_list = [ NewTopic("raw_orders", num_partitions=3, replication_factor=1), NewTopic("processed_orders", num_partitions=3, replication_factor=1), NewTopic("order_analytics", num_partitions=1, replication_factor=1) ] admin_client.create_topics(topic_list)
Here, we use the AdminClient to create three Kafka topics: 'raw_orders', 'processed_orders', and 'order_analytics'. Each topic is created with specified partitions and replication factors.
def produce_order(producer): order = { 'order_id': random.randint(1000, 9999), 'user_id': random.randint(1, 100), 'product_id': random.randint(1, 1000), 'quantity': random.randint(1, 5), 'price': round(random.uniform(10, 1000), 2), 'timestamp': datetime.now().isoformat() } producer.produce('raw_orders', key=str(order['order_id']), value=json.dumps(order)) print(f"Produced: {order}") producer.flush()
This function simulates incoming orders by generating random order data and producing it to the 'raw_orders' topic.
def process_orders(consumer, producer): while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(f"Consumer error: {msg.error()}") break order = json.loads(msg.value()) print(f"Consumed: {order}") processed_order = order.copy() processed_order['total_value'] = order['quantity'] * order['price'] processed_order['status'] = 'processed' producer.produce('processed_orders', key=str(order['order_id']), value=json.dumps(processed_order)) update_analytics(processed_order, producer)
This function continuously consumes messages from the 'raw_orders' topic, processes them by adding a total value and status, and then produces the processed order to the 'processed_orders' topic. It also calls update_analytics to update real-time analytics.
def update_analytics(order, producer): analytics_update = { 'timestamp': datetime.now().isoformat(), 'total_orders': 1, 'total_value': order['total_value'] } producer.produce('order_analytics', value=json.dumps(analytics_update))
This function simulates updating real-time analytics by producing a message to the 'order_analytics' topic for each processed order.
if __name__ == "__main__": producer = Producer(kafka_config) for _ in range(10): produce_order(producer) consumer_config = kafka_config.copy() consumer_config.update({ 'group.id': 'order-processing-group', 'auto.offset.reset': 'earliest' }) consumer = Consumer(consumer_config) consumer.subscribe(['raw_orders']) try: process_orders(consumer, producer) except KeyboardInterrupt: pass finally: consumer.close()
The main execution sets up a producer, generates 10 sample orders, sets up a consumer, and then starts processing orders. The consumer is configured with a group ID and set to read from the earliest offset. The script runs until interrupted, at which point it cleanly closes the consumer.
To run this example:
Ensure you have Kafka set up and running on localhost:9092.
Install the required Python package:
pip install confluent-kafka
Run the script:
python kafka_example.py
The script will create the necessary Kafka topics, produce sample orders, consume and process these orders, and update analytics in real-time.
Topic Creation: Demonstrates how to programmatically create Kafka topics, which is crucial for setting up a Kafka environment.
Message Production: Shows how to produce messages to Kafka topics, simulating real-world data ingestion.
Message Consumption: Illustrates how to consume messages from Kafka topics, including error handling and offset management.
Message Processing: Demonstrates a simple processing pipeline where raw data is consumed, processed, and then produced to another topic.
Real-time Analytics: Shows a basic implementation of real-time analytics updates, which is a common use case for Kafka in production environments.
Error Handling: Includes basic error handling for consumer errors and graceful shutdown on keyboard interrupt.
This example provides a foundation for building more complex Kafka-based data processing applications, demonstrating key concepts of producing, consuming, and processing data in real-time.
This Kafka example uses Docker Compose to run Kafka in a containerized environment, making it easy to set up and run the necessary infrastructure. The Docker Compose configuration and setup instructions are provided in an additional document named Kafka Docker Compose Configuration.
To run this example:
kafka_example.py script as described in the "Running the Example" section above.Using Docker Compose ensures a consistent and isolated environment for running Kafka, simplifying the setup process and making it easier to replicate the example on different systems.