• 通过钩子函数+Traceid实现Flask链路追踪


    背景

    在flask web中我们通常需要一个traceid作为调用参数传递给全链路各个调用函数

    1. 需要针对一次请求创建一个唯一的traceid:这里用uuid去简化代替
    2. 我们需要保证traceid不被污染,在每个请求期间存在,在请求结束销毁且线程独立:这里通过flask中的g对象来存储线程内的数据
    3. 由于我们使用g对象来存储,那么当接口中发起新的请求时候,新请求会创建新的g对象,此时g对象为空,我们需要让traceid可以在多个请求中共享数据:这里通过请求头中增加traceid来传递

    实现

    首先定义二个主要函数

    • 定义一个请求开始的时候需要调用的函数,用于初始化traceid或者获取上一个请求中的traceid以及其他一些相关请求参数
    • 定义一个请求结束的时候需要调用的函数,用于请求结束的日志响应报文收尾记录。
    复制代码
    import requests
    from flask import request, g
    import time
    from flask import Flask
    
    def get_uuid():
        import uuid
        return str(uuid.uuid4()).replace('-', '')
    
    
    def trace_add_log_record(event_des='',msg_dict={},remarks=''):
        #trace_links_index每次调用+1
        request.trace_links_index = request.trace_links_index + 1
        logs = {
            'traceid': g.traceid,
            'trace_index': request.trace_links_index,
            'event_des': event_des,
            'msg_dict': msg_dict,
            'remarks': remarks
        }
        print(logs)
    
    def trace_start_log_record_handler():
        # 获取traceid,如果存在则使用,否则生成一个
        if "traceid" in request.headers:
            g.traceid =  request.headers['traceid']
        else:
            g.traceid = get_uuid()
        # 初始化trace_links_index
        request.trace_links_index = 0
        # 记录开始时间
        request.start_time = time.time()
        log_msg = {
            'headers': request.headers,
            'url': request.url,
            'method': request.method,
            "request_data": request.args if request.method == "GET" else request.get_json(),
            'ip': request.headers.get("X-Real-IP") or request.remote_addr,
            'start_time': request.start_time
        }
        # 记录日志
        trace_add_log_record(event_des='start', msg_dict=log_msg)
    
    def trace_end_log_record_handler(reponse):
        # 记录结束时间
        request.end_time = time.time()
        # 记录traceid到响应头
        reponse.headers.add('traceid', g.traceid)
        log_msg = {
            "end_time" : request.end_time,
            "cost_time": request.end_time - request.start_time,
            "status_code": reponse.status_code,
            "headers": reponse.headers,
            "response_data": reponse.data.decode('utf-8')
        }
        # 记录日志
        trace_add_log_record(event_des='end', msg_dict=log_msg)
    复制代码

    接着,我们通过钩子函数去触发我们上述所写的两个重要函数

    复制代码
    """
    写钩子函数在app中注册,还有一种写法
    @app.before_request
    def before_request2():
        print('before_request2')
    
    
    @app.after_request
    def after_request1(response):
        print('after_request1')
    
        return response
    """
    #写钩子函数在app中注册
    def register_handler(response):
        def before_request():
            trace_start_log_record_handler()
        def after_request(response):
            trace_end_log_record_handler(reponse=response)
            return response
        # 注册钩子函数
        response.before_request(before_request)
        response.after_request(after_request)
    复制代码

    最后写测试接口进行测试

    复制代码
    app = Flask(__name__)
    @app.route('/')
    def hello_world():
        return 'Hello, World!'
    
    @app.route('/test')
    def test():
        name = request.args.get('name')
        hello_world = requests.get('http://127.0.0.1:5000/', headers={'traceid': g.traceid})
        return name + hello_world.text
    
    @app.route('/test2', methods=['POST'])
    def test2():
        name = request.get_json().get('name')
        hello_world = requests.get('http://127.0.0.1:5000/', headers={'traceid': g.traceid})
        return name + hello_world.text
    
    if __name__ == '__main__':
    
        register_handler(app)
        app.run(debug=True)
    复制代码

    这样我们简单了完成了通过traceid把链路串联起来了

    {'traceid': 'a5637579351c477a80090a88f5347088', 'trace_index': 1, 'event_des': 'start', 'msg_dict': {'headers': EnvironHeaders([('User-Agent', 'Apifox/1.0.0 (https://apifox.com)'), ('Content-Type', 'application/json'), ('Accept', '*/*'), ('Host', '127.0.0.1:5000'), ('Accept-Encoding', 'gzip, deflate, br'), ('Connection', 'keep-alive')]), 'url': 'http://127.0.0.1:5000/test?name=yetangjian', 'method': 'GET', 'request_data': ImmutableMultiDict([('name', 'yetangjian')]), 'ip': '127.0.0.1', 'start_time': 1717311086.757312}, 'remarks': ''}
    {'traceid': 'a5637579351c477a80090a88f5347088', 'trace_index': 1, 'event_des': 'start', 'msg_dict': {'headers': EnvironHeaders([('Host', '127.0.0.1:5000'), ('User-Agent', 'python-requests/2.25.1'), ('Accept-Encoding', 'gzip, deflate'), ('Accept', '*/*'), ('Connection', 'keep-alive'), ('Traceid', 'a5637579351c477a80090a88f5347088')]), 'url': 'http://127.0.0.1:5000/', 'method': 'GET', 'request_data': ImmutableMultiDict([]), 'ip': '127.0.0.1', 'start_time': 1717311086.7663064}, 'remarks': ''}
    {'traceid': 'a5637579351c477a80090a88f5347088', 'trace_index': 2, 'event_des': 'end', 'msg_dict': {'end_time': 1717311086.7663064, 'cost_time': 0.0, 'status_code': 200, 'headers': Headers([('Content-Type', 'text/html; charset=utf-8'), ('Content-Length', '13'), ('traceid', 'a5637579351c477a80090a88f5347088')]), 'response_data': 'Hello, World!'}, 'remarks': ''}
    {'traceid': 'a5637579351c477a80090a88f5347088', 'trace_index': 2, 'event_des': 'end', 'msg_dict': {'end_time': 1717311086.7683115, 'cost_time': 0.010999441146850586, 'status_code': 200, 'headers': Headers([('Content-Type', 'text/html; charset=utf-8'), ('Content-Length', '23'), ('traceid', 'a5637579351c477a80090a88f5347088')]), 'response_data': 'yetangjianHello, World!'}, 'remarks': ''}

     

  • 相关阅读:
    shell的执行流控制
    session伪造
    centos安装git
    使用Vue-cli构建spa项目及结构解析
    【pulsar学习】kafka存在的问题与pulsar应用场景
    F28335-可移植新建工程模板-基于bitfield
    重温redis和mysql的数据一致性问题
    quilt基本使用
    关闭文件流
    Redis大白话(●三●)
  • 原文地址:https://www.cnblogs.com/yetangjian/p/18227138