Create a simple, flexible REST API using Flask's minimalist approach. Perfect for learning API fundamentals, microservices, or rapid prototyping. Learn routing, request handling, and how to structure lightweight data services.
This document explains the Flask API example provided for data ingestion and retrieval in a data engineering context.
The example demonstrates how to:
from flask import Flask, request, jsonify from flask_httpauth import HTTPBasicAuth from functools import wraps import pandas as pd from datetime import datetime import logging app = Flask(__name__) auth = HTTPBasicAuth() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) data_store = {} cache = {} users = { "admin": "password123" }
This section imports necessary modules, initializes the Flask app, sets up authentication, logging, and in-memory storage.
@auth.verify_password def verify_password(username, password): if username in users and users[username] == password: return username
This function verifies user credentials for basic HTTP authentication.
def cache_result(expire_seconds=300): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): key = request.path if key in cache and (datetime.now() - cache[key]['time']).seconds < expire_seconds: return cache[key]['result'] result = f(*args, **kwargs) cache[key] = {'result': result, 'time': datetime.now()} return result return wrapper return decorator
This decorator implements a simple caching mechanism to store API responses.
@app.route('/ingest', methods=['POST']) @auth.login_required def ingest_data(): try: data = request.json if not data or 'dataset' not in data or 'records' not in data: return jsonify({"error": "Invalid data format"}), 400 df = pd.DataFrame(data['records']) # Perform some data cleaning and transformation df['timestamp'] = pd.to_datetime(df['timestamp']) df['value'] = pd.to_numeric(df['value'], errors='coerce') df = df.dropna() # Convert timestamp to string before storing df['timestamp'] = df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S') # Store processed data data_store[data['dataset']] = df.to_dict(orient='records') logger.info(f"Ingested {len(df)} records for dataset {data['dataset']}") logger.debug(f"Current data_store keys: {list(data_store.keys())}") return jsonify({"message": f"Successfully ingested {len(df)} records"}), 201 except Exception as e: logger.error(f"Error in data ingestion: {str(e)}") return jsonify({"error": f"Internal server error: {str(e)}"}), 500
This endpoint handles data ingestion, including data validation, cleaning, and storage.
@app.route('/data/<dataset>', methods=['GET']) @auth.login_required @cache_result(expire_seconds=60) def get_data(dataset): logger.debug(f"Attempting to retrieve dataset: {dataset}") logger.debug(f"Current data_store keys: {list(data_store.keys())}") if dataset not in data_store: logger.warning(f"Dataset '{dataset}' not found in data_store") return jsonify({"error": f"Dataset '{dataset}' not found"}), 404 try: # Perform some aggregation df = pd.DataFrame(data_store[dataset]) df['timestamp'] = pd.to_datetime(df['timestamp']) df['date'] = df['timestamp'].dt.strftime('%Y-%m-%d') # Convert to string result = df.groupby('date')['value'].agg(['mean', 'max', 'min']).to_dict(orient='index') # Convert numpy types to Python native types for JSON serialization for date, values in result.items(): result[date] = {k: v.item() if hasattr(v, 'item') else v for k, v in values.items()} return jsonify(result) except Exception as e: logger.error(f"Error in data retrieval: {str(e)}") return jsonify({"error": f"Internal server error: {str(e)}"}), 500
This endpoint retrieves and aggregates data for a specific dataset.
if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)
This part does the following:
host='0.0.0.0': Tells Flask to listen on all public IPs.port=5001: Explicitly sets the port to 5001 (Flask's default is 5000).debug=True: Keeps the debug mode on, which is useful during development but should be turned off in production.Install required packages:
pip install flask flask-httpauth pandas
Save the code in a file (e.g., app.py)
Run the Flask application:
python flask_api_example.py
The API will be accessible at http://127.0.0.1:5001/
Now, let's break down the key features and their significance in a data engineering context:
/data/<dataset> for data retrieval./data/<dataset> endpoint.curl -X POST http://127.0.0.1:5001/ingest \ -H "Content-Type: application/json" \ -H "Authorization: Basic YWRtaW46cGFzc3dvcmQxMjM=" \ -d '{"dataset": "test", "records": [{"timestamp": "2023-05-01T12:00:00", "value": 10}, {"timestamp": "2023-05-01T13:00:00", "value": 15}]}'
curl http://127.0.0.1:5001/data/test \ -H "Authorization: Basic YWRtaW46cGFzc3dvcmQxMjM="
This example showcases how Flask can be used to create a simple yet functional API for data ingestion and retrieval, incorporating several key concepts relevant to data engineering. It provides a foundation that can be extended for more complex data processing tasks and integrations with other systems.