• FastAPI 学习之路(二十八)使用密码和 Bearer 的简单 OAuth2


    OAuth2 规定在使用(我们打算用的)「password 流程」时,客户端/用户必须将 username 和 password 字段作为表单数据发送。我们看下在我们应该去如何实现呢。

    我们写一个登录接口,默认返回token和token_type

    1. from fastapi import FastAPI, Depends, status, HTTPException
    2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    3. from pydantic import BaseModel
    4. from typing import Optional
    5. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    6. fake_db_users ={
    7. "mrli": {
    8. "username": "mrli",
    9. "full_name": "mrli_hanjing",
    10. "email": "mrli@qq.com",
    11. "hashed_password": "mrli",
    12. "disabled": False
    13. }
    14. }
    15. app = FastAPI()
    16. def fake_hash_password(password: str):
    17. """模拟将密码加密"""
    18. return password
    19. class User(BaseModel):
    20. username: str
    21. email: Optional[str] = None
    22. full_name: Optional[str] = None
    23. disabled: Optional[bool] = None
    24. class UserInDB(User):
    25. hashed_password: str
    26. def get_user(db_users, username: str):
    27. if username in db_users:
    28. user_dict = db_users[username]
    29. return UserInDB(**user_dict)
    30. def fake_decode_token(token):
    31. """我们模拟返回的token值就是username,所以下面可以直接传token"""
    32. user = get_user(fake_db_users, token)
    33. return user
    34. def get_current_user(token: str = Depends(oauth2_scheme)):
    35. user = fake_decode_token(token)
    36. if not user:
    37. raise HTTPException(
    38. status_code=status.HTTP_401_UNAUTHORIZED,
    39. detail="Invalid authentication",
    40. headers={"WWW-Autehticated": "Bearer"}
    41. )
    42. return user
    43. @app.post("/login")
    44. def login(form_data: OAuth2PasswordRequestForm = Depends()):
    45. """
    46. 校验密码
    47. 目前我们已经从数据库中获取了用户数据,但尚未校验密码。
    48. 让我们首先将这些数据放入 Pydantic UserInDB 模型中。
    49. 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。
    50. 如果密码不匹配,我们将返回同一个错误。
    51. """
    52. user_dict = fake_db_users.get(form_data.username)
    53. if not user_dict:
    54. raise HTTPException(status_code=400, detail="Invalid username or password")
    55. user = UserInDB(**user_dict)
    56. hashed_password = fake_hash_password(form_data.password)
    57. if hashed_password != user.hashed_password:
    58. raise HTTPException(status_code=400, detail="Invalid username or password")
    59. return {"access_token": user.username, "token_type": "bearer"}
    60. @app.get("/users/me")
    61. def read_users_me(current_user: User = Depends(get_current_user)):
    62. return current_user
    63. if __name__ == '__main__':
    64. import uvicorn
    65. uvicorn.run("main:app", reload=True, debug=True)

     我们测试下登录接口

    接下来再测试下带认证的/users/me接口 (我们发现不能通过)

     接下来我们带上认证(校验通过,返回了我们想要的数据) 

    在我们的代码中,有这样一句:

    UserInDB(**user_dict)

     作用是直接将user_dict的键和值作为关键字参数传递,也就是python的解包,等同于:

    UserInDB(
        username = user_dict["username"],
        email = user_dict["email"],
        full_name = user_dict["full_name"],
        disabled = user_dict["disabled"],
        hashed_password = user_dict["hashed_password"],
    )

    假如我们把user的状态disabled改为True 

    fake_db_users ={
        "mrli": {
            "username": "mrli",
            "full_name": "mrli_hanjing",
            "email": "mrli@qq.com",
            "hashed_password": "mrli",
            "disabled": False
        }
    }

    我们不想让disabled为True的用户获取信息,那么我们如何实现呢?

    1. from fastapi import FastAPI, Depends, status, HTTPException
    2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    3. from pydantic import BaseModel
    4. from typing import Optional
    5. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    6. fake_db_users ={
    7. "mrli": {
    8. "username": "mrli",
    9. "full_name": "mrli_hanjing",
    10. "email": "mrli@qq.com",
    11. "hashed_password": "mrli",
    12. "disabled": True
    13. }
    14. }
    15. app = FastAPI()
    16. def fake_hash_password(password: str):
    17. """模拟将密码加密"""
    18. return password
    19. class User(BaseModel):
    20. username: str
    21. email: Optional[str] = None
    22. full_name: Optional[str] = None
    23. disabled: Optional[bool] = None
    24. class UserInDB(User):
    25. hashed_password: str
    26. def get_user(db_users, username: str):
    27. if username in db_users:
    28. user_dict = db_users[username]
    29. return UserInDB(**user_dict)
    30. def fake_decode_token(token):
    31. """我们模拟返回的token值就是username,所以下面可以直接传token"""
    32. user = get_user(fake_db_users, token)
    33. return user
    34. def get_current_user(token: str = Depends(oauth2_scheme)):
    35. user = fake_decode_token(token)
    36. if not user:
    37. raise HTTPException(
    38. status_code=status.HTTP_401_UNAUTHORIZED,
    39. detail="Invalid authentication",
    40. headers={"WWW-Autehticated": "Bearer"}
    41. )
    42. return user
    43. def get_current_active_user(current_user: User = Depends(get_current_user)):
    44. if current_user.disabled:
    45. raise HTTPException(status_code=400, detail="Inactive user")
    46. return current_user
    47. @app.post("/login")
    48. def login(form_data: OAuth2PasswordRequestForm = Depends()):
    49. """
    50. 校验密码
    51. 目前我们已经从数据库中获取了用户数据,但尚未校验密码。
    52. 让我们首先将这些数据放入 Pydantic UserInDB 模型中。
    53. 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。
    54. 如果密码不匹配,我们将返回同一个错误。
    55. """
    56. user_dict = fake_db_users.get(form_data.username)
    57. if not user_dict:
    58. raise HTTPException(status_code=400, detail="Invalid username or password")
    59. user = UserInDB(**user_dict)
    60. hashed_password = fake_hash_password(form_data.password)
    61. if hashed_password != user.hashed_password:
    62. raise HTTPException(status_code=400, detail="Invalid username or password")
    63. return {"access_token": user.username, "token_type": "bearer"}
    64. @app.get("/users/me")
    65. def read_users_me(current_user: User = Depends(get_current_active_user)):
    66. return current_user
    67. if __name__ == '__main__':
    68. import uvicorn
    69. uvicorn.run("main:app", reload=True, debug=True)

    其实很简单,我们就是在获取用户信息依赖的基础上增加了另一个是否是active的判断的依赖。

    def get_current_active_user(current_user: User = Depends(get_current_user)):
        if current_user.disabled:
            raise HTTPException(status_code=400, detail="Inactive user")
        return current_user
    
    @app.get("/users/me")
    def read_users_me(current_user: User = Depends(get_current_active_user)):
       
        return current_user

  • 相关阅读:
    服务容错之限流之 Tomcat 限流 & Tomcat 线程池的拒绝策略
    【Python(一)】环境搭建之Anaconda安装
    Elsevier出版社 | 优质好刊合集
    Docker Jenkins(改错版本)
    【Unity3D】Unity 游戏画面帧更新 ( 游戏物体 GameObject 移动 | 借助 Time.deltaTime 进行匀速运动 )
    Go与数据库:NoSQL数据库的应用
    微软Power Platform平台低代码
    代码随想录算法训练营Day 55 || 583. 两个字符串的删除操作、72. 编辑距离
    Linux学习系列:在CentOS 7上切换WiFi节点
    ReentrantLock、ReentrantReadWriteLock、StampedLock
  • 原文地址:https://blog.csdn.net/myli_binbin/article/details/126614376