• fastapi_No.21_安全性_目录权限认证



    在实际开发中,每个用户的权限是不同的。通俗的讲就是不同的用户可以访问不同的路径。
    在fastapi中主要是通过OAuth2 scopes来实现的。
    下面是代码实现过程的示例。

    后端实现步骤

    第一步:指定登录路径

    from fastapi.security import OAuth2PasswordBearer
    
    # 指定登录路径
    oauth2_scheme = OAuth2PasswordBearer(
    	tokenUrl="token",
        scopes={
            "me":"Read information about the current user",
            'items':'Read items'
        }
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    第二步:创建登录路径及其路径操作函数

    from fastapi import FastAPI,Depends
    from fastapi.security import OAuth2PasswordRequestForm
    from pydantic import BaseModel
    from jose import JWSError,jwt
    from typing import Union,List
    from datetime import datetime,timedelta
    from passlib.context import CryptContext
    
    #用于加密的密钥和算法及用户token过期时间
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    # 定义加密模式
    pwd_context = CryptContext(schemes=["bcrypt"],deprecated='auto')
    
    # 定义token回复模型
    class Token(BaseModel):
        access_token:str
        token_type:str
    
    # 定义用户数据模型
    class User(BaseModel):
        username:str
        email:Union[str,None] = None
        full_name:Union[str,None] = None
        disabled:Union[bool,None] = None
    
    class UserInDB(User):
        hashed_password:str
    
    # 定义根据用户名获取数据库用户信息
    def get_user(db,username:str):
        if username in db:
            user_dict = db[username]
            return UserInDB(**user_dict)
    
    # 验证密码及hash密码是否一致
    def verify_password(plain_pwd,hashed_pwd):
        return pwd_context.verify(plain_pwd,hashed_pwd)
    
    # 定义认证用户方法
    def authenticate_user(db,username:str,password:str):
        user = get_user(db=db,username=username)
        if not user:
            return False
        if not verify_password(password,user.hashed_password):
            return False
        return user
    
    # 定义创建token方法
    def create_access_token(data:dict,expires:Union[timedelta,None]=None):
        to_encode = data.copy()
        if expires:
            expire = datetime.now() + expires
        else:
            expire = datetime.now() + timedelta(minutes=15)
        to_encode.update({"exp":expire})
        #生成jwt token
        encoded_jwt = jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)
        return encoded_jwt
        
    # 创建FastAPI实例对象
    app = FastAPI()
    
    # 定义/token路径装饰器及路径操作函数
    @app.post("/token",response_model=Token)
    async def login_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):
        user = authenticate_user(fake_users_db,form_data.username,form_data.password)
        if not user:
            raise HTTPException(status_code=400,detail="Incorrent username or password")
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
        	# 此处是为了模拟路径权限,直接在form表单中传入了用户的路径权限scopes
        	# 实际应用中应根据用户信息,从数据库中获取scopes
            data={"sub":user.username,"scopes":form_data.scopes},
            expires=access_token_expires
        )
        return {"access_token":access_token,"token_type":"bearer"}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    第三步:创建根据token获取用户及其权限信息

    # 引入SecurityScopes
    from fastapi.security import SecurityScopes
    from fastapi import HTTPException,status
    
    fake_users_db = {
        "johndoe": {
            "username": "johndoe",
            "full_name": "John Doe",
            "email": "johndoe@example.com",
            "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
            "disabled": False,
        },
        "alice": {
            "username": "alice",
            "full_name": "Alice Chains",
            "email": "alicechains@example.com",
            "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
            "disabled": True,
        },
    }
    
    # 定义token数据模型
    class TokenData(BaseModel):
        username:Union[str,None] = None
        scopes:List[str] = []
    
    # 定义根据token获取用户信息及路径权限的方法
    # 这里的security_scopes表示用户访问时需要的权限名称
    async def get_current_user(
        security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)
    ):
        # 获取scopes
        if security_scopes.scopes:
            authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
        else:
            authenticate_value = f"Bearer"
        # 定义认证失败的报错
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='Could not validate credentials',
            headers={"WWW-Authenticate":authenticate_value}
        )
        try:
            # jwt解密token
            playload = jwt.decode(token,SECRET_KEY,algorithms=ALGORITHM)
            username = playload.get("sub")
            if username is None:
                raise credentials_exception
            token_scopes = playload.get("scopes",[])
            token_data = TokenData(username=username,scopes=token_scopes)
        except(JWSError,ValidationError):
            raise credentials_exception
        user = get_user(db=fake_users_db,username=token_data.username)
        if user is None:
            raise credentials_exception
        for scope in security_scopes.scopes:
            if scope not in token_data.scopes:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not enough permissions",
                    headers={"WWW-Authenticate":authenticate_value}
                )
        return user
    
    # 从fastapi中引入Security
    # Security是Depends的子类,具有scopes属性,并将scopes属性传入依赖项中,表示访问的路径
    from fastapi import Security
    # 验证用户是否处于激活状态
    async def get_current_active_user(
        current_user:User = Security(get_current_user,scopes=['me'])
    ):
        if current_user.disabled:
            raise HTTPException(status_code=400,detail="Inactive user")
        return current_user
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    第四步:路径修饰器及路径操作函数

    @app.get("/users/me",response_model=User)
    async def read_users_me(current_user:User=Depends(get_current_active_user)):
        return current_user
    
    @app.get("/users/me/items/")
    async def read_own_items(
        current_user: User = Security(get_current_active_user, scopes=["items"])
    ):
        return [{"item_id": "Foo", "owner": current_user.username}]
    
    @app.get("/status/")
    async def read_system_status(current_user: User = Depends(get_current_user)):
        return {"status": "ok"}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    第五步:运行服务器

    if __name__ == '__main__':
        import uvicorn
        uvicorn.run(app='test:app',host='127.0.0.1',port=8080,reload=True)
    
    • 1
    • 2
    • 3

    完整后端代码

    # oauth2 区域认证用来给用户或app特定的权限。
    # oauth2中的区域可以理解为string类型的列表,且列表中每个string不能包含空格
    
    from datetime import datetime,timedelta
    from typing import List,Union
    
    from fastapi import Depends,FastAPI,HTTPException,Security,status
    from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes
    
    from jose import JWSError,jwt
    from passlib.context import CryptContext
    from pydantic import BaseModel,ValidationError
    
    
    #用于加密的密钥和算法及用户token过期时间
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    # 模拟数据库中存储的用户数据
    fake_users_db = {
        "johndoe": {
            "username": "johndoe",
            "full_name": "John Doe",
            "email": "johndoe@example.com",
            "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
            "disabled": False,
        },
        "alice": {
            "username": "alice",
            "full_name": "Alice Chains",
            "email": "alicechains@example.com",
            "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
            "disabled": True,
        },
    }
    
    # 创建token返回数据模型
    class Token(BaseModel):
        access_token:str
        token_type:str
    
    # 创建token解析数据模型
    class TokenData(BaseModel):
        username:Union[str,None] = None
        scopes:List[str] = []
    
    # 创建用户数据模型
    class User(BaseModel):
        username:str
        email:Union[str,None] = None
        full_name:Union[str,None] = None
        disabled:Union[bool,None] = None
    
    class UserInDB(User):
        hashed_password:str
    
    # 定义加密模式
    pwd_context = CryptContext(schemes=["bcrypt"],deprecated='auto')
    
    # 定义登录地址及用户权限。
    # 此处设定scopes主要作用是用在docs中显示可选的几个scope,实际开发中可以不添加
    oauth2_scheme = OAuth2PasswordBearer(
        tokenUrl="token",
        scopes={
            "me":"Read information about the current user",
            'items':'Read items'
        }
    )
    
    app = FastAPI()
    
    # 验证密码是否正确
    def verify_password(plain_pwd,hashed_pwd):
        return pwd_context.verify(plain_pwd,hashed_pwd)
    
    # 生成加密的密码
    def get_hashed_password(password):
        return pwd_context.hash(password)
    
    # 根据用户名在数据库中获取用户信息
    def get_user(db,username:str):
        if username in db:
            user_dict = db[username]
            return UserInDB(**user_dict)
    
    # 用户登录认证,验证用户名是否存在,密码是否正确
    def authenticate_user(db,username:str,password:str):
        user = get_user(db=db,username=username)
        if not user:
            return False
        if not verify_password(password,user.hashed_password):
            return False
        return user
    
    # 创建token信息
    def create_access_token(data:dict,expires:Union[timedelta,None]=None):
        to_encode = data.copy()
        if expires:
            expire = datetime.now() + expires
        else:
            expire = datetime.now() + timedelta(minutes=15)
        to_encode.update({"exp":expire})
        #生成jwt token
        encoded_jwt = jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)
        return encoded_jwt
    
    # 解析token,获取用户相关信息(user,scopes等,根据定义来获得)
    # security_scopes表示被访问的路径所需的权限
    async def get_current_user(
        security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)
    ):
        # 获取scopes
        if security_scopes.scopes:
            authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
        else:
            authenticate_value = f"Bearer"
        # 定义认证失败的报错
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='Could not validate credentials',
            headers={"WWW-Authenticate":authenticate_value}
        )
        try:
            # jwt解密token
            playload = jwt.decode(token,SECRET_KEY,algorithms=ALGORITHM)
            # 获取token中的用户名信息,sub是根据后面的/token路径操作函数中定义得来
            username = playload.get("sub")
            if username is None:
                raise credentials_exception
            # 获取token中的scopes信息,scopes是根据后面的/token路径操作函数中定义得来
            token_scopes = playload.get("scopes",[])
            token_data = TokenData(username=username,scopes=token_scopes)
        except(JWSError,ValidationError):
            raise credentials_exception
        user = get_user(db=fake_users_db,username=token_data.username)
        if user is None:
            raise credentials_exception
        # 检查该路径访问所需的权限名称,在该用户token包含的scopes中是否都具有,否则报错
        for scope in security_scopes.scopes:
            if scope not in token_data.scopes:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not enough permissions",
                    headers={"WWW-Authenticate":authenticate_value}
                )
        return user
    
    # 定义获取当前激活用户的依赖函数
    async def get_current_active_user(
        # Security是Depends的子类,用来表示该路径所需的权限名称
        current_user:User = Security(get_current_user,scopes=['me'])
    ):
        if current_user.disabled:
            raise HTTPException(status_code=400,detail="Inactive user")
        return current_user
    
    # 定义登录接口,返回用户的token
    @app.post("/token",response_model=Token)
    async def login_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):
        user = authenticate_user(fake_users_db,form_data.username,form_data.password)
        if not user:
            raise HTTPException(status_code=400,detail="Incorrent username or password")
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
            data={"sub":user.username,"scopes":form_data.scopes},
            expires=access_token_expires
        )
        return {"access_token":access_token,"token_type":"bearer"}
    
    # 定义需要me权限的访问路径
    @app.get("/users/me",response_model=User)
    async def read_users_me(current_user:User=Depends(get_current_active_user)):
        return current_user
    
    # 定义需要me和items权限的访问路径
    @app.get("/users/me/items/")
    async def read_own_items(
        current_user: User = Security(get_current_active_user, scopes=["items"])
    ):
        return [{"item_id": "Foo", "owner": current_user.username}]
    
    # 定义需要用户登录的访问路径
    @app.get("/status/")
    async def read_system_status(current_user: User = Depends(get_current_user)):
        return {"status": "ok"}
    
    if __name__ == '__main__':
        import uvicorn
        uvicorn.run(app='oath2:app',host='127.0.0.1',port=8080,reload=True)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190

    前端验证

    登录

    登录界面username,password,scope是OAuth2PasswordRequestForm中限定的,必须用对应的名称。
    scope表示该用户拥有的权限名称,存在多个时,用空格分开。
    此处限定该用户有访问me权限。
    在这里插入图片描述

    访问me权限路径

    利用登录中获得的token信息,输入认证中心。
    可以获得响应的响应结果。
    在这里插入图片描述

    访问items权限路径

    由于该用户没有items权限,所以显示没有权限访问。
    在这里插入图片描述

    访问无权限限制路径

    由于status路径没有权限设定,故该用户可以访问成功。
    在这里插入图片描述

  • 相关阅读:
    windows xrdp 到 ubuntu 的一些问题记录
    Spring Boot 接口一个 JSON 字符串用两个对象去接收,这能行吗?
    基于低代码开发平台+BPM流程管理打造的行业KM解决方案
    检验检疫系统(LIS)源码:C# + MVC + SQLserver + Redis
    CopyOnWriteArrayList源码分析
    Windows消息种类
    【Educoder离散数学实训】计算机问题求解之递归 ※
    YOLO训练KITTI数据集(二):KITTI数据集的标签转换成YOLOv5所需要的标签格式并进行分割
    深度解析Sora的核心技术
    如何完美解决Sqoop导入导出MySQL数据错位问题
  • 原文地址:https://blog.csdn.net/baidu_38766791/article/details/127700309