• python 自建kafka消息生成和消费小工具


    要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。

    1. 安装所需的库

    pip install kafka-python flask
    
    • 1

    2. 创建 Flask API

    from flask import Flask, request, jsonify
    from kafka import KafkaProducer, KafkaConsumer
    
    app = Flask(__name__)
    
    # 配置 Kafka
    KAFKA_BROKER_URL = 'localhost:9092'
    TOPIC = 'test_topic'
    producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
    
    
    @app.route('/send', methods=['POST'])
    def send_message():
        message = request.json.get('message')
        if message:
            producer.send(TOPIC, value=message.encode('utf-8'))
            return jsonify({"status": "success", "message": "Message sent!"}), 200
        else:
            return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400
    
    
    @app.route('/receive', methods=['GET'])
    def receive_message():
        consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')
        messages = []
        for message in consumer:
            messages.append(message.value.decode('utf-8'))
            if len(messages) > 5:  # 只收集最近的5条消息,可以根据需要调整
                break
        return jsonify(messages)
    
    
    if __name__ == '__main__':
        app.run(debug=True, port=5000)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这个 Flask 应用程序定义了两个端点:

    • /send: 它接受 POST 请求并发送消息到 Kafka。
    • /receive: 它返回 Kafka 主题中的最近消息。

    3. 使用 API

    • 发送消息:
    curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
    
    • 1
    • 接收消息:
    curl http://localhost:5000/receive
    
    • 1

    这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。

  • 相关阅读:
    Springboot整合JavaMail(发送邮件)
    Spring Data JPA之Spring boot整合JPA进行CRUD
    机器学习入门(三)多元线性回归
    最近遇到几个小问题总结
    HTTP状态码是什么?
    js input 正则保留2位小数中文拼音输入问题 + 限制输入整数的方案
    交换两个数值(不用第三个变量
    windows11使用docker部署安装minio
    docker build xxx --build-arg设置http_proxy环境变量
    什么?MySQL 8.0 会同时修改两个ib_logfilesN 文件?
  • 原文地址:https://blog.csdn.net/weixin_38328865/article/details/133075417