要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。
1. 安装所需的库:
pip install kafka-python flask
2. 创建 Flask API:
from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumer
app = Flask(__name__)
# 配置 Kafka
KAFKA_BROKER_URL = 'localhost:9092'
TOPIC = 'test_topic'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
@app.route('/send', methods=['POST'])
def send_message():
message = request.json.get('message')
if message:
producer.send(TOPIC, value=message.encode('utf-8'))
return jsonify({"status": "success", "message": "Message sent!"}), 200
else:
return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400
@app.route('/receive', methods=['GET'])
def receive_message():
consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')
messages = []
for message in consumer:
messages.append(message.value.decode('utf-8'))
if len(messages) > 5: # 只收集最近的5条消息,可以根据需要调整
break
return jsonify(messages)
if __name__ == '__main__':
app.run(debug=True, port=5000)
这个 Flask 应用程序定义了两个端点:
/send
: 它接受 POST 请求并发送消息到 Kafka。/receive
: 它返回 Kafka 主题中的最近消息。3. 使用 API:
curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
curl http://localhost:5000/receive
这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。