Process unbounded data streams using Apache Flink and PyFlink. Learn stateful computations, event-time processing, and windowing operations - essential for building sophisticated real-time analytics and continuous ETL pipelines.
This document explains the PyFlink example provided in pyflink_example.py.
The example demonstrates how to:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes, TableDescriptor, Schema from pyflink.table.expressions import col, lit from pyflink.table.udf import udf from pyflink.table.window import Tumble import random import time env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env)
This section imports necessary PyFlink modules and sets up the execution environment. It creates a StreamExecutionEnvironment and a StreamTableEnvironment with a parallelism of 1.
t_env.create_temporary_table( 'sensor_data', TableDescriptor.for_connector('datagen') .schema(Schema.new_builder() .column('sensor_id', DataTypes.STRING()) .column('temperature', DataTypes.FLOAT()) .column('humidity', DataTypes.FLOAT()) .column('event_time', DataTypes.TIMESTAMP_LTZ(3)) .watermark('event_time', 'event_time - INTERVAL \'5\' SECOND') .build()) .option('rows-per-second', '5') .option('fields.sensor_id.kind', 'random') .option('fields.temperature.min', '20.0') .option('fields.temperature.max', '30.0') .option('fields.humidity.min', '30.0') .option('fields.humidity.max', '70.0') .build() )
This code defines a temporary table sensor_data using a data generator connector. It specifies the schema of the data, including a watermark for event time processing, and sets options for generating random sensor data.
@udf(result_type=DataTypes.BOOLEAN()) def is_temperature_anomaly(temp, avg_temp): return abs(temp - avg_temp) > 5.0
This UDF defines an anomaly detection function that checks if a temperature reading deviates significantly from the average temperature.
result_table = t_env.from_path('sensor_data') \ .window(Tumble.over(lit(10).seconds).on(col("event_time")).alias("w")) \ .group_by(col("sensor_id"), col("w")) \ .select( col("sensor_id").alias("result_sensor_id"), col("w").start.alias("window_start"), col("w").end.alias("window_end"), col("temperature").avg.alias("avg_temperature"), col("humidity").avg.alias("avg_humidity") )
This section processes the sensor data by applying a tumbling window of 10 seconds, grouping by sensor ID and window, and calculating average temperature and humidity for each window.
final_result = result_table \ .join(t_env.from_path('sensor_data')) \ .where((col('result_sensor_id') == col('sensor_id')) & (col('window_start') <= col('event_time')) & (col('window_end') > col('event_time'))) \ .select( col("result_sensor_id"), col("window_start"), col("window_end"), col("avg_temperature"), col("avg_humidity"), is_temperature_anomaly( col("temperature"), col("avg_temperature") ).alias("is_anomaly") )
This code joins the windowed averages with the original sensor data and applies the anomaly detection UDF to identify temperature anomalies.
t_env.execute_sql(""" CREATE TABLE print_sink ( sensor_id STRING, window_start TIMESTAMP(3), window_end TIMESTAMP(3), avg_temperature FLOAT, avg_humidity FLOAT, is_anomaly BOOLEAN ) WITH ( 'connector' = 'print' ) """) final_result.execute_insert('print_sink').wait()
This section creates a print sink table and inserts the final results into it, which will print the results to the console.
env.execute("IoT Sensor Data Processing")
This line executes the Flink job with the name "IoT Sensor Data Processing".
To run this example:
Ensure you have PyFlink installed:
pip install apache-flink
Run the script:
python pyflink_example.py
The script will process simulated IoT sensor data, detect temperature anomalies, and print the results to the console.
Streaming Data Processing: Demonstrates PyFlink's capability to process continuous streams of data in real-time.
Windowed Operations: Shows how to apply time-based windows to aggregate streaming data, which is crucial for many real-time analytics scenarios.
User-Defined Functions (UDFs): Illustrates the use of UDFs to implement custom logic (anomaly detection) within the data processing pipeline.
Event Time Processing: Uses event time and watermarks to handle out-of-order events, which is essential for accurate time-based operations in distributed systems.
Data Generation: Shows how to use PyFlink's built-in data generation capabilities for testing and development purposes.
Table API: Demonstrates the use of PyFlink's Table API, which provides a higher-level abstraction for stream processing compared to the DataStream API.
Join Operations: Illustrates how to join streams of data, which is a common requirement in many real-world data processing scenarios.
This example provides a foundation for building more complex stream processing applications with PyFlink, demonstrating key concepts of real-time data processing, windowing, and anomaly detection.
The PyFlink example script (pyflink_example.py) requires a running Flink environment to execute. To simplify the setup and ensure consistency across different systems, we use Docker to run a Flink cluster.
The Docker setup for this example is defined in the docker-compose.yml file, which is explained in detail in the Flink Docker Compose Explanation document. This Docker Compose configuration sets up a complete Flink cluster with a Job Manager and Task Manager, providing the necessary environment to run the PyFlink script.
To run the PyFlink example:
First, start the Flink cluster using Docker Compose:
docker-compose up -d
Once the Flink cluster is running, you can execute the PyFlink script:
python pyflink_example.py
You can monitor the job execution and results through the Flink Web UI at http://localhost:8081.
After you're done, stop the Flink cluster:
docker-compose down
By using this Docker setup, you ensure that the PyFlink example runs in a properly configured Flink environment, regardless of your local system setup. This approach makes it easier to develop, test, and demonstrate Flink applications consistently across different machines.