• 快速实现用户认证:使用Python和Flask配合PyJWT生成与解密Token的教程及示例代码


    生成token 与解密 token 和 拦截器

    #学习交流 访问
    # https://v.iiar.cn
    
    import jwt
    import datetime
    from models import XUser
    from flask import request, jsonify
    from functools import wraps
    
    SECRET_KEY = 'XPay'
    
    
    # 创建token
    def generate_token(user_id):
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                SECRET_KEY,  # 替换为你的密钥
                algorithm='HS256'
            )
        except Exception as e:
            return e
    
    
    # 解析token
    def decode_token(token):
        """ 解码Token并处理异常 """
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
            return payload['sub']  # 返回成功标志和用户ID
        except jwt.ExpiredSignatureError as e:
            return False
        except jwt.InvalidTokenError as e:
            return False
    
    
    # 查询用户状态
    def check_token_and_user_status(token):
        user_id = decode_token(token)
        print(user_id)
        token_in_user = XUser.query.filter_by(token=token).first()
    
        # 根据是否开启多端登陆判断token是否有效
        multi_device_login = True
        if not multi_device_login and not token_in_user:
            return False, 'token过期'
    
        if user_id:
            user = XUser.query.get(user_id)
            if user:
                if user.state == '正常':
                    return True, user
                else:
                    return False, '该用户状态异常,请联系客服'
            else:
                return False, '用户不存在'
        else:
            return False, 'token无效'
    
    
    def user_token_required(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            token = request.headers.get('token')
            if not token:
                return jsonify({
                    'code': 401,
                    'data': '',
                    'msg': 'token不存在',
                    'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
    
            user_state, user_info = check_token_and_user_status(token)
            if not user_state:
                return jsonify({
                    'code': 401,
                    'data': '',
                    'msg': user_info,
                    'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
            return f(*args, **kwargs, user_info=user_info)
    
        return decorated_function
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    详细解释

    这段代码提供了一个使用 Python 和 Flask 结合 JWT (JSON Web Tokens) 进行用户认证的简单框架。它包括了生成 token、解码 token、检查用户状态和一个装饰器函数,用于保护需要认证的路由。下面是对代码的逐部分解释:

    1. generate_token(user_id) 函数

    • 这个函数用于为指定的用户 ID 生成一个 JWT token。Token 包含三个重要的信息(称为payload):exp(过期时间),iat(发行时间),和sub(主题,这里用作用户 ID)。
    • Token 使用 HS256 算法进行签名,SECRET_KEY作为签名密钥。
    • Token 默认有效期为 7 天。

    2. decode_token(token) 函数

    • 用于解码和验证接收到的 JWT token。如果 token 有效且未过期,这个函数返回保存在 token 的sub字段中的用户 ID。
    • 如果 token 过期(ExpiredSignatureError)或无效(InvalidTokenError),函数将返回 False。

    3. check_token_and_user_status(token) 函数

    • 首先解码 token 来获取用户 ID,然后查询数据库中是否存在对应的用户和 token。
    • 如果设置了不允许多端登录(multi_device_login = False),它还会检查数据库中的 token 是否与提供的 token 匹配。
    • 根据用户的状态(如是否正常)和 token 的有效性返回相应的状态和信息。

    4. user_token_required(f) 装饰器

    • 一个 Flask 路由装饰器,用于在执行实际路由函数之前进行用户认证。
    • 从请求头中获取 token,如果不存在 token 或 token 无效,则返回错误信息。
    • 如果用户认证成功,路由函数将正常执行。装饰器会将user_info(从check_token_and_user_status返回的用户信息)作为关键字参数传递给路由函数。

    使用场景

    • 在需要对用户身份进行验证的 Flask 路由上,可以使用@user_token_required装饰器来确保只有携带有效 token 的请求才能访问。
    • 这套系统使得管理用户会话和访问控制变得简单,特别是在构建 RESTful API 时非常有用。

    注意事项

    • 实际部署时,SECRET_KEY应当是一个安全的值,且不应该硬编码在代码中。
    • 在处理用户状态和多端登录逻辑时,需要根据实际业务需求进行调整。
    • XUser模型和数据库查询逻辑需要根据实际的数据库设计来实现。这里假设XUser是一个模型类,代表用户数据,且有一个state属性和一个token属性。

    模型部分

    class XUser(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        mid = db.Column(db.Integer, comment='推荐人id')
        phone = db.Column(db.String(80), comment='手机号')
        user_name = db.Column(db.String(80), comment='用户昵称')
        openid = db.Column(db.String(120), comment='openid')
        password = db.Column(db.String(120), comment='密码')
        registration_time = db.Column(db.DateTime, default=datetime.now, comment='注册时间')
        token = db.Column(db.String(255), comment='Token')
        account_balance = db.Column(db.Numeric(20, 4), default=0.00, comment="账户余额")
        state = db.Column(db.String(120), default='正常', comment='用户状态')
        fee_percentage = db.Column(db.Integer, default=4, comment='手续费比例')
    
        def to_dict(self):
            return {
                'id': self.id,
                'mid': self.mid,
                'phone': self.phone,
                'user_name': self.user_name,
                'openid': self.openid,
                'registration_time': self.registration_time.strftime(
                    '%Y-%m-%d %H:%M:%S') if self.registration_time else None,
                'account_balance': float(self.account_balance) if self.account_balance is not None else None,
                'state': self.state,
                'fee_percentage': self.fee_percentage
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    模型解释

    这个XUser类是一个模型定义,使用 SQLAlchemy ORM (一个 Python SQL 工具包和对象关系映射器)用于定义和操作数据库中的用户表。每个属性(使用db.Column声明)对应用户表中的一个字段。以下是各个字段的解释:

    字段解释

    • id: 用户的唯一标识符,整数类型,设为主键。
    • mid: 推荐人的ID,整数类型,用来表示这个用户是被哪个已存在的用户推荐的。
    • phone: 用户的手机号,字符串类型,最长80字符。
    • user_name: 用户昵称,字符串类型,最长80字符。
    • openid: 用户的微信OpenID,字符串类型,最长120字符。这是微信平台特有的标识用户的ID,用于识别微信用户。
    • password: 用户密码,字符串类型,最长120字符。在实际应用中,密码应该经过加密存储。
    • registration_time: 用户注册时间,DateTime类型,默认值为当前时间。这里使用了datetime.now来自动设置这个字段的值。
    • token: 用于认证的Token,字符串类型,最长255字符。这是在用户登录时生成的,用于后续请求的身份验证。
    • account_balance: 用户账户余额,数值类型,最多20位数字,小数点后最多4位。默认值为0.00。
    • state: 用户状态,字符串类型,最长120字符,默认值为’正常’。这个字段可以用来表示用户是否被禁用或其他状态。
    • fee_percentage: 手续费比例,整数类型。表示在进行某些交易时需要收取的手续费百分比,默认值为4%。

    to_dict 方法

    to_dict方法是一个实例方法,用于将XUser对象的属性转换成一个字典,这在进行JSON序列化时非常有用,例如在 RESTful API 响应中返回用户信息。这个方法特别处理了registration_timeaccount_balance字段,确保它们以适当的格式输出(registration_time格式化为字符串,account_balance确保为浮点数或None)。

    总结

    XUser类不仅定义了数据库表结构,还提供了方便的方法来操作和转换用户数据。通过使用 SQLAlchemy ORM,可以简化数据库操作,提高代码的可读性和可维护性。

    用户注册/登陆/获取用户基本信息

    from datetime import datetime
    from models import db, XUser
    from flask import request, jsonify, Blueprint
    
    from tools.common_method import check_password, set_password
    from tools.user_token import generate_token
    from tools.user_token import user_token_required
    
    user_api = Blueprint('user_api', __name__)
    
    
    # 登陆
    @user_api.route('/api/user/login', methods=['POST'])
    def user_login():
        data = request.json
        phone = data.get('phone')
        password = data.get('password')
        user = XUser.query.filter_by(phone=phone).first()
        print(phone)
        if not user:
            return jsonify({
                'code': 403,
                'data': '',
                'msg': '用户账号或密码错误',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        else:
            if check_password(user.password, password) and user.state == '正常':
                token = generate_token(user.id)
                user.token = token
                db.session.commit()
                return jsonify({
                    'code': 200,
                    'token': token,
                    'msg': '登陆成功',
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
            else:
                return jsonify({
                    'code': 403,
                    'data': '',
                    'msg': '该用户状态不允许登陆',
                    'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                })
    
    
    # 注册
    @user_api.route('/api/user/register', methods=['POST'])
    def user_register():
        data = request.json
        phone = data.get('phone')
        password = data.get('password')
        user_name = data.get('user_name')
        password_len = len(password)
        phone_len = len(phone)
        if phone_len != 11:
            return jsonify({
                'code': 403,
                'data': '',
                'msg': '注册失败,请输入11位手机号',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        if password_len < 6 or password_len > 32:
            return jsonify({
                'code': 403,
                'data': '',
                'msg': '注册失败,密码长度请大于6位,小于32位',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        user = XUser.query.filter_by(phone=phone).first()
        if user:
            return jsonify({
                'code': 403,
                'data': '',
                'msg': '注册失败,该手机已经注册,请直接登陆或更换手机后注册',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
        else:
            new_password = set_password(password)
            new_user = XUser(
                phone=phone,
                password=new_password,
                user_name=user_name
            )
            db.session.add(new_user)
            db.session.commit()
            token = generate_token(new_user.id)
            new_user.token = token
            db.session.commit()
            return jsonify({
                'code': 200,
                'token': token,
                'msg': '注册成功',
                'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            })
    
    
    @user_api.route('/api/user/user_info', methods=['GET'])
    @user_token_required
    def get_user_info(user_info):
        return reg_func(200, user_info.to_dict(), '获取信息成功')
    
    
    def reg_func(code, data, msg):
        return jsonify({
            'code': code,
            'data': data,
            'msg': msg,
            'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }), code
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    注册/登陆/获取用户信息代码详细解释

    这段代码是一个使用 Flask 框架构建的简易用户管理系统的一部分,包括用户的登录、注册以及获取用户信息的功能。它利用了 Flask 的 Blueprint 功能来组织和注册相关的路由,以及 SQLAlchemy ORM 来处理数据库操作。下面是对代码主要部分的解释:

    登录 (user_login)

    • 接收客户端发送的 JSON 数据,包含用户的phonepassword
    • 通过手机号查询数据库中的用户。
    • 如果用户不存在,返回403错误和消息"用户账号或密码错误"
    • 如果用户存在,调用check_password函数验证密码是否正确,并检查用户状态是否为"正常"
    • 如果验证通过,为用户生成一个新的 token(使用generate_token函数),更新用户的 token 字段,并提交到数据库。
    • 返回包含新 token 和成功消息的 JSON 响应。

    注册 (user_register)

    • 同样接收包含phonepassworduser_name的 JSON 数据。
    • 验证手机号是否为11位,密码长度是否在6到32位之间。
    • 如果手机号或密码格式不符合要求,返回403错误和相应的失败消息。
    • 如果手机号已经被注册,返回403错误和消息"注册失败,该手机已经注册,请直接登陆或更换手机后注册"
    • 如果验证通过,使用set_password函数加密密码,然后创建一个新的XUser实例,并将其添加到数据库。
    • 为新用户生成 token,更新用户的 token 字段,并提交到数据库。
    • 返回包含新 token 和成功消息的 JSON 响应。

    获取用户信息 (get_user_info)

    • 这个路由使用@user_token_required装饰器,要求请求必须包含有效的 token。
    • 如果认证通过,装饰器会传递user_info参数(用户信息)给get_user_info函数。
    • 函数调用reg_func生成标准的 JSON 响应,包含用户信息和成功消息。

    辅助函数 (reg_func)

    • reg_func是一个帮助函数,用于生成标准化的 JSON 响应。它接收状态码、数据和消息作为参数,并返回一个 Flask jsonify 响应。

    关键工具和方法

    • Blueprint:用于创建一组相关的路由和视图函数。
    • request.json:用于获取 JSON 格式的请求体数据。
    • jsonify:将数据转换为 JSON 响应。
    • db.session:用于数据库操作,如添加新记录和提交更改。
    • generate_tokenuser_token_required:自定义函数和装饰器,用于处理 JWT token 的生成和验证。

    整个代码展示了如何在 Flask 应用中实现用户认证和管理的基本流程,使用 JWT tokens 提供安全的用户状态管理和接口保护。

  • 相关阅读:
    Docker(13)-- Docker 网络
    初识Maven(一)命令行操作和idea创建maven工程
    springboot常见网络相关错误及原因解析
    《10人以下小团队管理手册》推荐书评
    深入源码剖析String类为什么不可变?(还不明白就来打我)
    Blender快捷键
    JS 字符串转 GBK 编码超精简实现
    基于JavaSwing开发通讯录管理系统+开题报告+论文 毕业设计 课程设计 大作业
    史上第三大收购案,博通以 610 亿美元收购 VMware
    第二十一章 数据库连接池,过滤器
  • 原文地址:https://blog.csdn.net/weixin_40541330/article/details/136140355