REST是一种软件架构模式,它的核心概念包括:资源,资源在REST中代表的是URL,表示一个网络实体。它的使用可以使操作关系变得更加有序,增强URL的可读性,使资源描述与视图松耦合,并且可以提供OpenAPI,便于第三方集成等。下面就来讲讲flask的restful代码风格
首先,需要引入flask-restful资源包
from flask_restful import *
引入包后将进入正式的使用环节,包括以下三个基本步骤
# 在restful风格中想返回中文,需要取消ascii编码的配置
app.config["RESTFUL_JSON"] = {"ensure_ascii": False}
# 1、创建组件对象
组件对象 = Api(app)
# 定义返回数据格式输出
types_fields = {
'id': fields.Integer,
'name': fields.String(attribute='type_name')
}
# 创建解析对象
type_parser = reqparse.RequestParser()
# 添加解析后的入参
type_parser.add_argument('typeName',type=str,required=True,help='必须添加新闻分类名',location='form')
# 2、定义类视图
class 自定义视图类(Resource):
# 给指定函数添加指定的装饰器
method_decorators = {
"get": [mydecorator1, mydecorator2],
"post": [mydecorator2]
}
# 使用marshal序列化返回字典类型1
@marshal_with(types_fields)
def get(self):
types = NewsType.query.all()
return types
def post(self):
# 获取入参
args = type_parser.parse_args()
typeName = args.get('typeName')
# 操作数据
newsType = NewsType()
newsType.Type_name = typeName
db.session.add(newsType)
db.session.commit()
# 使用marshal序列化返回字典类型2 [newsType:type_fields]
return marshal(newsType,types_fields)
# 3、组件添加类视图
组件对象.add_resource(视图类, URL资源段)
Example
项目架构(此处没有编写相应的html,可根据需求与前几章内容自行补充)
app.py
from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager
from app.models.news_model import *
from app.models.user_model import *
import flask_caching.backends
from app import create_app
from exts import db
app = create_app()
manager = Manager(app=app)
migrate = Migrate(app=app,db=db)
manager.add_command('db',MigrateCommand)
if __name__ == '__main__':
manager.run()
settings.py
import os
class Config:
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'mysql://root:123456@127.0.0.1:3306/newsdb'
SQLALCHEMY_TRACK_MODIFICATIONS = True
SQLALCHEMY_ECHO = True
# secret_key加密码
SECRET_KEY = 'JIAMIMA'
RECAPTCHA_PUBLIC_KEY = "SHAJDGBCYSDCUDUSAD"
RECAPTCHA_PRIVATE_KEY = "AUERHDNSBDSDSADSLADDQ"
RECAPTCHA_PARAMETERS = {'hl':'zh','render':'explicit'}
RECAPTCHA_DATA_ATTRS = {'theme':'dark'}
# 项目路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 静态文件夹的路径
STATIC_DIR = os.path.join(BASE_DIR,'static')
TEMPLATE_DIR = os.path.join(BASE_DIR,'templates')
# 头像的上传目录
UPLOAD_ICON_DIR = os.path.join(STATIC_DIR,'upload/icon')
# 相册的上传目录
UPLOAD_PHONE_DIR = os.path.join(STATIC_DIR,'upload/phone')
class DevelopmentConfig(Config):
ENV = 'development'
DEBUG = True
class ProductionConfig(Config):
ENV = 'production'
DEBUG = True
exts下的init.py
from flask_caching import Cache
from flask_cors import CORS
from flask_restful import Api
from flask_sqlalchemy import SQLAlchemy
import pymysql
pymysql.install_as_MySQLdb()
db = SQLAlchemy()
cors = CORS()
cache = Cache()
app下的init.py
from flask import Flask
from app.apis.news_api import news_bp
from app.apis.user_api import user_bp
from exts import db, cors, cache
from settings import DevelopmentConfig
config = {
'CACHE_TYPE': 'redis',
'CACHE_REDIS_HOST': '127.0.0.1',
'CACHE_REDIS_PORT': 6379
}
def create_app():
app = Flask(__name__,static_folder='../static',template_folder='../templates')
app.config.from_object(DevelopmentConfig)
db.init_app(app=app)
cors.init_app(app=app,supports_credentials=True)
cache.init_app(app=app,config=config)
app.register_blueprint(news_bp)
app.register_blueprint(user_bp)
return app
utils下的init.py
from random import random
from flask import request, g
from flask_restful import abort
from app.models.user_model import User
from app.utils.smssend import SmsSendAPIDemo
from exts import cache
def sendMessage(phone):
SECRET_ID = "********" # 产品密钥ID,产品标识
SECRET_KEY = "************" # 产品私有密钥,服务端生成签名信息使用,请严格保管,避免泄露
BUSINESS_ID = "***************" # 业务ID,易盾根据产品业务特点分配
api = SmsSendAPIDemo(SECRET_ID,SECRET_KEY,BUSINESS_ID)
# secret_pair = SecretPair(SECRET_ID,SECRET_KEY)
# api = SmsSendAPIDemo(BUSINESS_ID,secret_pair)
# 随机产生验证码
code = ""
for i in range(4):
ran = random.randint(0,9)
code += str(ran)
params = {
"mobile": phone,
"templateId": "10084",
"paramType": "json",
"params": {'code':code,'time':'20211224'}
# 国际短信对应的国际编码(非国际短信接入请注释掉该行代码)
# "internationalCode": "对应的国家编码"
}
ret = api.send(params)
return ret,code
def check_user():
auth = request.headers.get('Authorization')
if not auth:
abort(401, msg = '请先登录')
mobile = cache.get(auth)
if not mobile:
abort(402, mag = '无效的令牌')
user = User.query.filter(User.phone==mobile).first()
if not user:
abort(403, msg = '此用户已被管理员删除!')
g.user = user
def login_required(func):
def wrapper(*args,**kwargs):
check_user()
return func(*args,**kwargs)
return wrapper
smssend.py
import hashlib
from hashlib import md5
import json
import random
import time
import urllib
import urllib.request
class SmsSendAPIDemo(object):
API_URL = "https://sms.dun.163.com/v2/sendsms"
VERSION = "v2"
def __init__(self, secret_id, secret_key, business_id):
"""
Args:
secret_id (str) 产品密钥ID,产品标识
secret_key (str) 产品私有密钥,服务端生成签名信息使用
business_id (str) 业务ID,易盾根据产品业务特点分配
"""
self.secret_id = secret_id
self.secret_key = secret_key
self.business_id = business_id
def gen_signature(self, params=None):
"""生成签名信息
Args:
params (object) 请求参数
Returns:
参数签名md5值
"""
buff = ""
for k in sorted(params.keys()):
buff += str(k) + str(params[k])
buff += self.secret_key
return hashlib.md5(buff.encode("utf-8")).hexdigest()
def send(self, params):
"""请求易盾接口
Args:
params (object) 请求参数
Returns:
请求结果,json格式
"""
params["secretId"] = self.secret_id
params["businessId"] = self.business_id
params["version"] = self.VERSION
params["timestamp"] = int(time.time() * 1000)
params["nonce"] = int(random.random() * 100000000)
params["signature"] = self.gen_signature(params)
try:
params = urllib.parse.urlencode(params)
params = params.encode('utf-8')
request = urllib.request.Request(self.API_URL, params)
content = urllib.request.urlopen(request, timeout=5).read()
return json.loads(content)
# response = request.post(self.API_URL, data=params)
# return response.json()
except Exception as ex:
print("调用API接口失败:", str(ex))
if __name__ == "__main__":
"""示例代码入口"""
SECRET_ID = "***********" # 产品密钥ID,产品标识
SECRET_KEY = "***************************" # 产品私有密钥,服务端生成签名信息使用,请严格保管,避免泄露
BUSINESS_ID = "*******************************" # 业务ID,易盾根据产品业务特点分配
api = SmsSendAPIDemo(SECRET_ID, SECRET_KEY, BUSINESS_ID)
params = {
"mobile": "15010185644",
"templateId": "10084",
"paramType": "json",
"params": "{'code':200','time':'20211224'}"
# 国际短信对应的国际编码(非国际短信接入请注释掉该行代码)
# "internationalCode": "对应的国家编码"
}
ret = api.send(params)
if ret is not None:
if ret["code"] == 200:
taskId = ret["data"]["taskId"]
print("taskId = %s" % taskId)
else:
print ("ERROR: ret.code=%s,msg=%s" % (ret['code'], ret['msg']))
models下的user_model.py
from app.models import BaseModel
from exts import db
class User(BaseModel):
username = db.Column(db.String(50),nullable=False)
password = db.Column(db.String(128),nullable=False)
phone = db.Column(db.String(11),unique=True,nullable=False)
icon = db.Column(db.String(256))
newsList = db.relationship('News',backref='author')
comment = db.relationship('Comment',backref='user')
replys = db.relationship('Reply',backref='user')
def __str__(self):
return self.username
models下的news_model.py
from app.models import BaseModel
from exts import db
class NewsType(BaseModel):
__tablename__='news_type'
type_name = db.Column(db.String(50),nullable=False)
newsList = db.relationship('News',backref='newstype')
class News(BaseModel):
__tablename__ = 'news'
title = db.Column(db.String(100),nullable=False)
content = db.Column(db.Text,nullable=False)
desc = db.Column(db.String(255),nullable=False)
news_type_id = db.Column(db.Integer,db.ForeignKey('news_type.id'))
user_id = db.Column(db.Integer,db.ForeignKey('user.id'))
comments = db.relationship('Comment',backref='news')
def __str__(self):
return self.title
class Comment(BaseModel):
__tablename__ = 'comment'
content = db.Column(db.String(255),nullable=False)
love_num = db.Column(db.Integer,default=0)
user_id = db.Column(db.Integer,db.ForeignKey('user.id'))
news_id = db.Column(db.Integer,db.ForeignKey('news.id'))
replys = db.relationship('Reply',backref='replys')
def __str__(self):
return self.content
class Reply(BaseModel):
__tablename__ = 'reply'
content = db.Column(db.String(255), nullable=False)
love_num = db.Column(db.Integer, default=0)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
comment_id = db.Column(db.Integer,db.ForeignKey('comment.id'))
def __str__(self):
return self.content
apis下的user_api.py
import uuid
from random import random
from flask import Blueprint, jsonify, session, render_template
from flask_restful import Api, Resource, reqparse, inputs, fields, marshal
from werkzeug.security import generate_password_hash, check_password_hash
from app.models.user_model import User
from app.utils import sendMessage
from exts import cache, db
user_bp = Blueprint('user',__name__)
api = Api(user_bp)
sms_parser = reqparse.RequestParser()
sms_parser.add_argument('mobile',type=inputs.regex(r'^1[356789]\d{9}$'),required=True,help='手机号码格式错误',location=['form','args'])
class toIndexApi(Resource):
def get(self):
return render_template('test.html')
# 发送手机验证码
class SendMessageApi(Resource):
def post(self):
args = sms_parser.parse_args()
mobile = args.get()
ret,code = sendMessage(mobile)
if ret is not None:
if ret["code"] == 200:
cache.set(mobile+"_code_l", code, timeout=180)
return jsonify(cood=200,msg='短信发送成功!')
else:
print ("ERROR: ret.code=%s,msg=%s" % (ret['code'], ret['msg']))
return jsonify(cood=200, msg='短信发送失败!')
return {'status':400,'msg':'验证码f发送有误'}
# 输入
loginRegister_parser = sms_parser.copy()
loginRegister_parser.add_argument('code',type=inputs.regex(r'^\d{4}$'),help='必须输入四位数字验证码',required=True,location=['form','args'])
# 输出
user_fields = {
'id': fields.Integer,
'username': fields.String
}
# 用户的登录和注册
class LoginAndRegisterApi(Resource):
def post(self):
args = loginRegister_parser.parse_args()
mobile = args.get('mobile')
code = args.get('code')
cache_code = cache.get(mobile+'_code_l')
if cache and code == cache_code:
# 数据库查询是否存在相应的mobile
users = User.query.filter(User.phone==mobile).all().first()
if not users:
# 注册处理
user = User()
user.phone = mobile
s = ''
for i in range(13):
ran = random.randint(0.9)
s += str(ran)
user.username = '用户' + s
db.session.add(user)
db.session.commit()
# 登陆处理 记住登录状态:session,cookie,cache(redis)
# 说明用户是登陆成功的
token = str(uuid.uuid4()).replace('-', '') + str(random.randint(100, 999))
# 存储用户的登录信息
cache.set(token, mobile)
# cache.set(mobile+'_status',1)
data = {
'status': 200,
'msg': '用户登录成功!',
'token': token,
'users': marshal(users,user_fields) # 输出的内容定制
}
return data
else:
return {'errormsg':'验证码错误','status': 400}
# 忘记密码
class ForgetPasswordApi(Resource):
def get(self):
s = 'qwertyuiopasdfghjklzxcvbnmMNBVCXZLKJHGFDSAPOIUYTREWQ1234567890'
code = ' '
for i in range(4):
ran = random.choice(s)
code += ran
# 保存code
session['code'] = code
return {'code': code}
# 申请重置密码的输入
reset_parser = reqparse.RequestParser()
reset_parser.add_argument('imageCode',type=inputs.regex(r'^[a-zA-Z0-9]{4}$'),help='必须输入正确格式的验证码',location=['form','args'])
# 申请重置密码
class ResetPasswordApi(Resource):
def get(self):
args = reset_parser.parse_args()
mobile = args.get('mobile')
imageCode = args.get('imageCode')
code = session.get('code')
if code and imageCode.lower() == code.lower():
# 判断手机号码
user = User.query.filter(User.phone == mobile).first()
if user:
# 发送手机验证码
ret, smscode = sendMessage(mobile)
if ret is not None:
if ret["code"] == 200:
cache.set(mobile + "_code_r", smscode, timeout=180)
return jsonify(status=200, msg='短信发送成功!')
else:
print("ERROR: ret.code=%s,msg=%s" % (ret['code'], ret['msg']))
return jsonify(status=200, msg='短信发送失败!')
else:
return {'status':400,'msg':'验证码发送有误'}
else:
return {'status':400,'msg':'此用户未注册,请先注册!'}
else:
return {'status':400,'msg':'验证码输入有误或者超时'}
# 更新密码
# 客户端要传入的信息
update_parser = loginRegister_parser.copy()
update_parser.add_argument('password',
type=inputs.regex(r'^(?=.*\d)(?=.*[a-z])(?=.*[A-Z])[a-zA-Z0-9]{8,10}$'),
help='必须输入包含大小写字母和数字组合,不能使用特殊字符',
location='form')
update_parser.add_argument('repassword',
type=inputs.regex(r'^(?=.*\d)(?=.*[a-z])(?=.*[A-Z])[a-zA-Z0-9]{8,10}$'),
help='必须输入包含大小写字母和数字组合,不能使用特殊字符',
location='form')
class UpdatePasswordApi(Resource):
def post(self):
args = update_parser.parse_args()
code = args.get('code')
mobile = args.get('mobile')
cache_code = cache.get(mobile+'_code_r')
# 判断验证码是否正确
if code and cache_code.low() == code.lower():
user = User.query.filter(User.phone == mobile).first()
password = args.get('password')
repassword = args.get('repassword')
# 判断两次密码是否一样
if password == repassword:
user.password = generate_password_hash(password)
db.session.commit()
return {'status':200,'msg':'设置密码成功!'}
else:
return {'status':400,'msg':'两次密码不一致!'}
else:
return {'status':400,'msg':'验证码有误!'}
# 账号密码登录
# 设置需要前端传入的内容
password_login_parser = sms_parser.copy()
password_login_parser.add_argument('password',type=str,help='必须输入密码',required=True,location='form')
class PasswordLoginApi(Resource):
def post(self):
args = password_login_parser.parse_args()
mobile = args.get('mobile')
password = args.get('password')
# 判断用户
user = User.query,filter(User.phone == mobile).first()
if user:
if check_password_hash(user.password,password):
# 说明用户是登陆成功的
token = str(uuid.uuid4()).replace('-','')+str(random.randint(100,999))
# 存储用户的登录信息
cache.set(token,mobile)
# cache.set(mobile+'_status',1)
return {'status':200,'msg':'用户登录成功!','token':token}
return {'status':400,'msg':'账户名或密码有误!'}
# 访问首页
api.add_resource(toIndexApi,'/index')
# 发送手机验证码
api.add_resource(SendMessageApi,'/sms')
# 用户的登录和注册
api.add_resource(LoginAndRegisterApi,'/code_login')
# 忘记密码
api.add_resource(ForgetPasswordApi,'/forget_password')
# 申请重置密码
api.add_resource(ResetPasswordApi,'/reset_password')
# 重置密码
api.add_resource(UpdatePasswordApi,'/update_password')
# 账号密码登录
api.add_resource(PasswordLoginApi,'/pwdlogin')
apis下的news_api.py
from flask import Blueprint, app, g
from flask_restful import Api, Resource, fields, marshal_with, reqparse, marshal
from app.models.news_model import NewsType, News
from app.utils import login_required
from exts import db
news_bp = Blueprint('news',__name__,url_prefix='/news')
api = Api(news_bp)
# 新闻类型输出格式
types_fields = {
'id': fields.Integer,
'name': fields.String(attribute='type_name')
}
# 新闻类型添加传入
type_parser = reqparse.RequestParser()
type_parser.add_argument('typeName',type=str,required=True,help='必须添加新闻分类名',location='form')
# 新闻类型修改传入
update_type_parser = type_parser.copy()
update_type_parser.add_argument('id',type=int,required=True,help='必须要添加修改的新闻分类id')
# 新闻类型删除传入
delete_type_parser = reqparse.RequestParser()
delete_type_parser.add_argument('id',type=int,required=True,help='必须要添加修改的新闻分类id')
# 新闻类型api
class NewsTypeApi(Resource):
@marshal_with(types_fields)
def get(self):
types = NewsType.query.all()
# print(app.url_map)
return types
# 使用post添加新闻类型
def post(self):
args = type_parser.parse_args()
typeName = args.get('typeName')
# 数据库添加
newsType = NewsType()
newsType.Type_name = typeName
db.session.add(newsType)
db.session.commit()
return marshal(newsType,types_fields)
# 修改分类名称
def put(self):
args = update_type_parser.parse_args()
typeId = args.get('id')
new_type_name = args.get('typeName')
type_obj = NewsType.query.get(typeId)
if type_obj:
type_obj.type_name = new_type_name
db.session.commit()
data = {
'status': 200,
'msg': '修改成功',
'type': marshal(type_obj,types_fields)
}
else:
data={
'status': 400,
'msg': '类型查找失败!',
}
return data
# 删除分类名称
def delete(self):
args = delete_type_parser.parse_args()
typeId = args.get('id')
type_obj = NewsType.query.get(typeId)
if type_obj:
db.session.delete(type_obj)
db.session.commit()
data = {
'status': 200,
'msg': '类型删除成功!',
}
else:
data = {
'status': 400,
'msg': '类型删除失败!',
}
return data
news_parser = reqparse.RequestParser()
news_parser.add_argument('typeid',type=int,help='必须添加新闻类型id',required=True)
news_parser.add_argument('page',type=int)
# 自定义fields
class AuthorName(fields.Raw):
def format(self,value):
return value.username
# 每条新闻的格式
news_fields = {
'id': fields.Integer,
'title': fields.String,
'desc': fields.String,
'datetime': fields.DateTime(attribute='date_time'),
'author': AuthorName(attribute='author'),
'url': fields.Url('newdetail',absolute=True) # absolute=True代表绝对路径
}
# 新闻api
class NewsListApi(Resource):
# 获取某个新闻分类下的新闻
def get(self):
args = news_parser.parse_args()
typeid = args.get('typeid')
page = args.get('page',1)
# newsType = NewsType.query.get(typeid)
pagination = NewsType.query.filter(NewsType.id==typeid).paginate(page=page,per_page=8)
data = {
'has_more': pagination.has_next,
'data': marshal(pagination.items,news_fields),
'return_count': len(pagination.items),
'html': 'null'
}
return data
# 回复的格式
reply_fields = {
'user': AuthorName(attribute='user'),
'content': fields.String,
'datetime': fields.DateTime(attribute='date_time'),
'lovenum': fields.Integer(attribute='love_num')
}
# 评价的格式
comment_fields = {
'user': AuthorName(attribute='user'),
'content': fields.String,
'datetime': fields.DateTime(attribute='date_time'),
'lovenum': fields.Integer(attribute='love_num'),
'replys': fields.List(fields.Nested(reply_fields))
}
news_detail_fields = {
'id': fields.Integer,
'title': fields.String,
'content': fields.String,
'datetime': fields.DateTime(attribute='date_time'),
'author': AuthorName(attribute='author'),
'comment': fields.List(fields.Nested(comment_fields))
}
class NewsDetailApi(Resource):
@marshal_with(news_detail_fields)
def get(self,id):
news = News.query.get(id)
return news
def post(self):
pass
# 定义新闻发布的传入
add_news_parser = reqparse.RequestParser()
add_news_parser.add_argument('title',type=str,required=True,help='必须填写新闻标题')
add_news_parser.add_argument('content',type=str,required=True,help='必须填写新闻主体内容')
add_news_parser.add_argument('typeid',type=int,required=True,help='必须填写新闻类型id')
class NewsApi(Resource):
@login_required
def post(self):
args = add_news_parser.parse_args()
title = args.get('title')
content = args.get('content')
typeId = args.get('typeid')
news = News()
news.title = title
news.content = content
news.news_type_id = typeId
news.desc = content[:100]+'......'
news.user_id = g.user.id
db.session.add(news)
db.session.commit()
data = {
'status': 200,
'msg': '新闻发布成功!',
'news': marshal(news,news_detail_fields)
}
return data
def patch(self):
pass
def put(self):
pass
def delete(self):
pass
api.add_resource(NewsTypeApi,'/types')
api.add_resource(NewsListApi,'/newslist')
api.add_resource(NewsDetailApi,'/newsdetail/' ,'newsdetail')
api.add_resource(NewsApi,'/news')