• Python Flask


    下面将使用Flask框架来演示一个具有分页和查询条件的简单REST API。

    Flask应用结构

    project/
    │
    ├── app/
    │   ├── __init__.py
    │   ├── models.py
    │   ├── services.py
    │   └── controllers.py
    │
    └── run.py
    

    __init__.py

    这是Flask应用的初始化文件。

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    from flask_marshmallow import Marshmallow
    
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
    db = SQLAlchemy(app)
    ma = Marshmallow(app)
    
    from app import controllers
    

    models.py

    定义数据库模型。

    from . import db, ma
    
    class User(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(100))
        email = db.Column(db.String(120), unique=True)
    
        def __init__(self, name, email):
            self.name = name
            self.email = email
    
    class UserSchema(ma.Schema):
        class Meta:
            fields = ('id', 'name', 'email')
    
    user_schema = UserSchema()
    users_schema = UserSchema(many=True)
    

    services.py

    定义服务层。

    from . import db, models
    
    def get_all_users(page, per_page):
        return models.User.query.paginate(page, per_page, error_out=False)
    
    def get_user_by_id(user_id):
        return models.User.query.get_or_404(user_id)
    
    def add_user(name, email):
        new_user = models.User(name=name, email=email)
        db.session.add(new_user)
        db.session.commit()
        return new_user
    
    def update_user(user_id, name, email):
        user = models.User.query.get(user_id)
        if user:
            user.name = name
            user.email = email
            db.session.commit()
            return user
        return None
    
    def delete_user(user_id):
        user = models.User.query.get(user_id)
        if user:
            db.session.delete(user)
            db.session.commit()
            return True
        return False
    

    controllers.py

    定义控制器层,处理HTTP请求。

    from flask import Blueprint, request, jsonify
    from . import models, services
    
    bp = Blueprint('users', __name__, url_prefix='/api/users')
    
    @bp.route('/', methods=['GET'])
    def search_users():
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 10, type=int)
        name = request.args.get('name', '')
    
        users = services.get_all_users(page, per_page)
        if name:
            users.items = [user for user in users.items if name.lower() in user.name.lower()]
    
        return make_response(200, 'Users fetched successfully.', {
            'total': users.total,
            'pages': users.pages,
            'page': page,
            'per_page': per_page,
            'items': models.users_schema.dump(users.items)
        })
    
    @bp.route('/', methods=['POST'])
    def add_user():
        name = request.json.get('name')
        email = request.json.get('email')
        if not name or not email:
            return make_response(400, 'Name and email are required.')
    
        new_user = services.add_user(name, email)
        return make_response(201, 'User created successfully.', models.user_schema.dump(new_user))
    
    @bp.route('/', methods=['PUT'])
    def update_user(user_id):
        name = request.json.get('name')
        email = request.json.get('email')
        if not name or not email:
            return make_response(400, 'Name and email are required.')
    
        updated_user = services.update_user(user_id, name, email)
        if updated_user:
            return make_response(200, 'User updated successfully.', models.user_schema.dump(updated_user))
        else:
            return make_response(404, 'User not found.')
    
    # 定义的通用响应函数如下:
    def make_response(code, message, data=None):
        response = {
            'code': code,
            'message': message,
            'data': data
        }
        return jsonify(response)
    

    run.py

    运行Flask应用的入口。

    from app import app
    
    if __name__ == '__main__':
        app.run(debug=True)
    

    请注意,上述Python代码使用了Flask、Flask-SQLAlchemy和Flask-Marshmallow库,它们提供了类似Spring Boot的功能,但语法和结构有所不同。在Python中,我们通常不会像Java那样明确地区分服务层、控制器层等,但在大型项目中,仍然可以采用类似的分层架构。

  • 相关阅读:
    python3 requests中文乱码问题之压缩格式问题
    ETHERNET/IP从站转CANOPEN主站连接AB系统的配置方法
    如何在TIA博途中在线更新PLC的CPU固件版本?
    智能配电房监控系统:实现配电智能化管理
    2021年电工杯数学建模B题光伏建筑一体化板块指数发展趋势分析及预测求解全过程论文及程序
    无公网IP通过旁路由openwrt的Zerotier实现和在家一样访问家里每个设备
    【JavaEE进阶系列 | 从小白到工程师】Date类的构造方法以及SimpleDateFormat构造方法与日期转换详解上篇
    使用Perl脚本编写爬虫程序的一些技术问题解答
    【java】ArrayList和LInkedList的区别
    java集合专题_集合体系介绍_集合遍历(1)
  • 原文地址:https://blog.csdn.net/svygh123/article/details/139639222