既然我们已经有了所有的安全流程,就让我们来使用 JWT 令牌和安全哈希密码让应用程序真正地安全。
关于 JWT
它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准。字符串看起来像这样:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
它没有被加密,因此任何人都可以从字符串内容中还原数据。
但它经过了签名。因此,当你收到一个由你发出的令牌时,可以校验令牌是否真的由你发出。
通过这种方式,你可以创建一个有效期为 1 周的令牌。然后当用户第二天使用令牌重新访问时,你知道该用户仍然处于登入状态。
一周后令牌将会过期,用户将不会通过认证,必须再次登录才能获得一个新令牌。而且如果用户(或第三方)试图修改令牌以篡改过期时间,你将因为签名不匹配而能够发觉。
我们看下如何实现?
pip install python-jose
pip install passlib 还需要安装 bcrypt pip install bcrypt
创建一个工具函数以哈希来自用户的密码。
然后创建另一个工具函数,用于校验接收的密码是否与存储的哈希值匹配。
再创建另一个工具函数用于认证并返回用户。
创建用于设定 JWT 令牌签名算法的变量 「ALGORITHM」,并将其设置为 "HS256"。
创建一个设置令牌过期时间的变量。
定义一个将在令牌端点中用于响应的 Pydantic 模型。
创建一个生成新的访问令牌的工具函数。
get_current_user使用的是 JWT 令牌解码,接收到的令牌,对其进行校验,然后返回当前用户。
如果令牌无效,立即返回一个 HTTP 错误。
使用令牌的过期时间创建一个 timedelta 对象。
创建一个真实的 JWT 访问令牌并返回它。
我们最后看下实现代码
- from fastapi import FastAPI, Depends, status, HTTPException
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
-
- from pydantic import BaseModel
- from typing import Optional
- from jose import JWTError, jwt
- from datetime import datetime, timedelta
- from passlib.context import CryptContext
-
- SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
- fake_db_users ={
- "mrli": {
- "username": "mrli",
- "full_name": "mrli_hanjing",
- "email": "mrli@qq.com",
- "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
- "disabled": False
- }
- }
-
- app = FastAPI()
-
- def fake_hash_password(password: str):
- """模拟将密码加密"""
- return password
-
-
- class Token(BaseModel):
- access_token: str
- token_type: str
-
-
- class TokenData(BaseModel):
- username: Optional[str] = None
-
-
- class User(BaseModel):
- username: str
- email: Optional[str] = None
- full_name: Optional[str] = None
- disabled: Optional[bool] = None
-
-
- class UserInDB(User):
- hashed_password: str
-
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
-
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
-
- def get_password_hash(password):
- return pwd_context.hash(password)
-
- def authenticate_user(db_user, username: str, password: str):
- user = get_user(db_user, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
-
- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
-
- def get_user(db_users, username: str):
- if username in db_users:
- user_dict = db_users[username]
- return UserInDB(**user_dict)
-
- def fake_decode_token(token):
- """我们模拟返回的token值就是username,所以下面可以直接传token"""
- user = get_user(fake_db_users, token)
- return user
-
- def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication Failed",
- headers={"WWW-Authenticate": "Bearer"}
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(fake_db_users, username=token_data.username)
- if not user:
- raise credentials_exception
- return user
-
- def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
-
-
- @app.post("/token", response_model=Token)
- def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- user = authenticate_user(fake_db_users, form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"}
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {
- "access_token": access_token,
- "token_type": "bearer"
- }
-
- @app.get("/users/me")
- def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
-
-
- if __name__ == '__main__':
- import uvicorn
- uvicorn.run("main:app", reload=True, debug=True)
我们在Swagger UI上看下效果

当我们实现了Oauth2权限校验后就会出现如上图的Authorize 锁的标志,默认密码是secret,当我们输入用户名密码正确后,会返回用户信息,说明校验成功

我们看下接口/user/me的请求
之所以能够正常返回信息是因为我们在请求该接口之前登录了,我们看下该接口的请求头信息,其携带了认证后的token,所以才能正常返回数据:

如果我们先不登录呢,也就是说不登录就不会有认证头的token信息,那么会怎么样呢?

校验失败了
这样就完成了:使用(哈希)密码和 JWT Bearer 令牌的 OAuth2。注意:接口返回的用户不应该返回密码,这个需要在实际中需要屏蔽