OAuth2 规定在使用(我们打算用的)「password 流程」时,客户端/用户必须将 username 和 password 字段作为表单数据发送。我们看下在我们应该去如何实现呢。
我们写一个登录接口,默认返回token和token_type
- from fastapi import FastAPI, Depends, status, HTTPException
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
-
- from pydantic import BaseModel
- from typing import Optional
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
- fake_db_users ={
- "mrli": {
- "username": "mrli",
- "full_name": "mrli_hanjing",
- "email": "mrli@qq.com",
- "hashed_password": "mrli",
- "disabled": False
- }
- }
-
- app = FastAPI()
-
- def fake_hash_password(password: str):
- """模拟将密码加密"""
- return password
-
-
- class User(BaseModel):
- username: str
- email: Optional[str] = None
- full_name: Optional[str] = None
- disabled: Optional[bool] = None
-
-
- class UserInDB(User):
- hashed_password: str
-
- def get_user(db_users, username: str):
- if username in db_users:
- user_dict = db_users[username]
- return UserInDB(**user_dict)
-
- def fake_decode_token(token):
- """我们模拟返回的token值就是username,所以下面可以直接传token"""
- user = get_user(fake_db_users, token)
- return user
-
- def get_current_user(token: str = Depends(oauth2_scheme)):
- user = fake_decode_token(token)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication",
- headers={"WWW-Autehticated": "Bearer"}
- )
- return user
-
-
- @app.post("/login")
- def login(form_data: OAuth2PasswordRequestForm = Depends()):
- """
- 校验密码
- 目前我们已经从数据库中获取了用户数据,但尚未校验密码。
- 让我们首先将这些数据放入 Pydantic UserInDB 模型中。
- 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。
- 如果密码不匹配,我们将返回同一个错误。
- """
- user_dict = fake_db_users.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="Invalid username or password")
- user = UserInDB(**user_dict)
- hashed_password = fake_hash_password(form_data.password)
- if hashed_password != user.hashed_password:
- raise HTTPException(status_code=400, detail="Invalid username or password")
- return {"access_token": user.username, "token_type": "bearer"}
-
- @app.get("/users/me")
- def read_users_me(current_user: User = Depends(get_current_user)):
- return current_user
-
-
- if __name__ == '__main__':
- import uvicorn
- uvicorn.run("main:app", reload=True, debug=True)
我们测试下登录接口

接下来再测试下带认证的/users/me接口 (我们发现不能通过)

接下来我们带上认证(校验通过,返回了我们想要的数据)
在我们的代码中,有这样一句:
UserInDB(**user_dict)
作用是直接将user_dict的键和值作为关键字参数传递,也就是python的解包,等同于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
假如我们把user的状态disabled改为True
fake_db_users ={
"mrli": {
"username": "mrli",
"full_name": "mrli_hanjing",
"email": "mrli@qq.com",
"hashed_password": "mrli",
"disabled": False
}
}
我们不想让disabled为True的用户获取信息,那么我们如何实现呢?
- from fastapi import FastAPI, Depends, status, HTTPException
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
-
- from pydantic import BaseModel
- from typing import Optional
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
- fake_db_users ={
- "mrli": {
- "username": "mrli",
- "full_name": "mrli_hanjing",
- "email": "mrli@qq.com",
- "hashed_password": "mrli",
- "disabled": True
- }
- }
-
- app = FastAPI()
-
- def fake_hash_password(password: str):
- """模拟将密码加密"""
- return password
-
-
- class User(BaseModel):
- username: str
- email: Optional[str] = None
- full_name: Optional[str] = None
- disabled: Optional[bool] = None
-
-
- class UserInDB(User):
- hashed_password: str
-
- def get_user(db_users, username: str):
- if username in db_users:
- user_dict = db_users[username]
- return UserInDB(**user_dict)
-
- def fake_decode_token(token):
- """我们模拟返回的token值就是username,所以下面可以直接传token"""
- user = get_user(fake_db_users, token)
- return user
-
- def get_current_user(token: str = Depends(oauth2_scheme)):
- user = fake_decode_token(token)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication",
- headers={"WWW-Autehticated": "Bearer"}
- )
- return user
-
- def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
-
-
- @app.post("/login")
- def login(form_data: OAuth2PasswordRequestForm = Depends()):
- """
- 校验密码
- 目前我们已经从数据库中获取了用户数据,但尚未校验密码。
- 让我们首先将这些数据放入 Pydantic UserInDB 模型中。
- 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。
- 如果密码不匹配,我们将返回同一个错误。
- """
- user_dict = fake_db_users.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="Invalid username or password")
- user = UserInDB(**user_dict)
- hashed_password = fake_hash_password(form_data.password)
- if hashed_password != user.hashed_password:
- raise HTTPException(status_code=400, detail="Invalid username or password")
- return {"access_token": user.username, "token_type": "bearer"}
-
- @app.get("/users/me")
- def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
-
-
- if __name__ == '__main__':
- import uvicorn
- uvicorn.run("main:app", reload=True, debug=True)
其实很简单,我们就是在获取用户信息依赖的基础上增加了另一个是否是active的判断的依赖。
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me")
def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user