本示例说明

需求:在 mysql 数据库包含了两个表: tb_dept, tb_emp。现在需要开发接口完成对这个表的数据进行增删查改。

本示例结构是按照官网示例划分的,感觉还行。下面关于文件划分的关键点也会进行说明

图

1 基于已有表映射 ORM 模型

本地生成 ORM 映射模型

sqlacodegen: 自动生成 sqlalchemy 表对象模型.

安装

pip install sqlacodegen

使用:

sqlacodegen --outfile model.py driver://user:pass@localhost/dbname 
# 默认是全部表, 加上参数 --tables 指定表名
sqlacodegen --tables tablesName ...

生成的文件:

# coding: utf-8
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy.dialects.mysql import INTEGER
from sqlalchemy.orm import relationship

from app.db.base_class import Base

class TbDept(Base):
    __tablename__ = 'tb_dept'

    dno = Column(INTEGER(11), primary_key=True, comment='编号')
    dname = Column(String(10), nullable=False, comment='名称')
    dloc = Column(String(20), nullable=False, comment='所在地')

class TbEmp(Base):
    __tablename__ = 'tb_emp'

    eno = Column(INTEGER(11), primary_key=True, comment='员工编号')
    ename = Column(String(20), nullable=False, index=True, comment='姓名')
    job = Column(String(20), nullable=False, comment='员工职位')
    mgr = Column(INTEGER(11), index=True, comment='主管编号')
    sal = Column(INTEGER(11), nullable=False, comment='员工月薪')
    comm = Column(INTEGER(11), comment='每月补贴')
    dno = Column(ForeignKey('tb_dept.dno', ondelete='CASCADE', onupdate='CASCADE'), index=True, comment='所在部门编号')

    test = Column(Boolean, default=False, comment='test')

    tb_dept = relationship('TbDept')

这里是将 Base 原本的Base = declarative_base(),替换为了在其他文件创建的 Base, 方便后续管理。

使用 Alembic 迁移工具管理数据库

相较于上次的项目演示 ,这次的 env.py 配置 是按照官网的演示将多个 ORM 类导入到 一个文件里面进行统一在进行导出

# /app/db/base.py

# 导入所有的模型,已被 Alembic 检索到

from app.db.base_class import Base
from app.models.hrs import TbDept, TbEmp
# /alembic/env.py
# ...

import os
import sys
# 这是bug吗? 此文件运行时的位置始终在 这里,而非在此文件夹外部,此时导入 自定义包的位置报错
sys.path.append(os.path.dirname(os.pardir))

from app.db.base import Base

target_metadata = Base.metadata

# ...
# 导入自定义的数据库链接
def get_url():
    from app.core.config import settings
    return settings.SQLALCHEMY_DATABASE_URL

# ... 注意后面两个函数导入的url是来自外部 .ini 文件的,需要替换(上篇文章已详细介绍,此不再赘述

最后生成 迁移脚本文件 并更新到数据库就行了。

2 创建 Pydantic 模型

# /app/schemas/tb_emp.py
from typing import List

from pydantic import Field
from pydantic.main import BaseModel


class TbEmpBase(BaseModel):
    eno: int


# del 和 查询
class TbEmpDel(TbEmpBase):
    pass


# 供其他 pydantic模型调用, 作为返回体的一部分
class Pydantic2TbEmp(TbEmpBase):
    ename: str
    job: str
    mgr: int = None

    class Config:
        orm_mode = True


# 返回全部信息,包括部门
class TbEmp(Pydantic2TbEmp):    
    # 注意 python模块间相互导入的问题
    from app.schemas.tb_dept import Pydantic2TbDept
    # 这里希望返回部门的全部字段, 注意指定的别名 是 ORM类的属性名
    department: Pydantic2TbDept = Field(None, alias='tb_dept')

# /app/schemas/tb_dept.py
from typing import List

from pydantic import BaseModel, Field


class TbDeptBase(BaseModel):
    dno: int


# 创建,修改
class TbDeptCreate(TbDeptBase):
    dname: str
    dloc: str


class Pydantic2TbDept(TbDeptBase):
    dname: str
    dloc: str

    class Config:
        orm_mode = True


class TbDept(Pydantic2TbDept):
    # 该约束下返回所有员工
    from app.schemas.tb_emp import Pydantic2TbEmp
    employees: List[Pydantic2TbEmp] = Field([], alias='tb_emp')


# 仅存有 id
class TbDeptDel(TbDeptBase):
    pass

注意在包 schemas 的是将其导入的,以方便后续外部使用如 app.schemas.tb_dept.TbDept -> app.schemas.TbDept

# app/schemas/__init__.py
from .tb_emp import TbEmp, TbEmpDel
from .tb_dept import TbDept, TbDeptDel, TbDeptCreate

3 创建 CURD 操作函数的文件

官网的演示项目,是用 CURD 类,进行操作。暂时没有此需求(大概原理是将 CURD 抽离,然后其 CURDBase 类接收子类传入的 model, 和数据),后续有时间在更新。

现在还是按照函数的方式:如下

# app/curd/curd_dept.py
from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session

from app.models.hrs import TbDept
from app.schemas import tb_dept


def add_dept(db: Session, item: tb_dept.TbDeptCreate):
    obj = db.query(TbDept).get(item.dno)
    if obj:
        raise HTTPException(status_code=400, detail='该用户库中已存在')

    # 将对象转为字典
    item_dict = jsonable_encoder(item)
    # 得到一个 ORM对象实例
    db_obj = TbDept(**item_dict)
    db.add(db_obj)
    db.commit()
    # 刷新,以获得对象在数据库的一些属性
    db.refresh(db_obj)
    return db_obj


def del_dept(db: Session, dno: tb_dept.TbDeptDel):
    item = db.query(TbDept).get(dno.dno)
    if not item:
        raise HTTPException(status_code=400, detail="没有此 员工编号")
    db.delete(item)
    db.commit()
    return item


def get_all(db: Session):
    return db.query(TbDept).all()


def update(db: Session, *, dno: int, new_item: tb_dept.TbDeptCreate):
    item = db.query(TbDept).get(dno)
    if not item:
        raise HTTPException(status_code=400, detail={'msg': '服务器没有此对象'})
    # 如果字段过多,应该考虑在 bese类定义魔法方法 getattr等使其支持 []语法,然后遍历属性完成赋值
    item.dno = new_item.dno
    item.dloc = new_item.dloc
    item.dname = new_item.dname
    db.commit()
    return item

# app/curd/curd_emp.py
from fastapi import HTTPException
from sqlalchemy.orm import Session
from app.models.hrs import TbEmp


def get_emp(db: Session, eno: int):
    # 按照编号查询
    return db.query(TbEmp).get(eno)


def remove_emp(db: Session, eno: int):
    item = db.query(TbEmp).get(eno)
    if not item:
        raise HTTPException(status_code=400, detail="没有此 员工编号")
    db.delete(item)
    db.commit()
    # ** 
    return db.merge(item)


def get_emp_all(db: Session):
    # 注意此处没有做分页
    return db.query(TbEmp).all()

tips: 此处若使用 async def 则需要等待 await

delete 提交后不能访问其内部数据的问题

return db.merge(item)注意此处 ,删除并提交后, item 将不能被访问。使用 merge 方法更新此条记录的存活周期,此方法不是针对于此的

此方法与 update 类似,但是如果数据库中没有该记录,使用 merge 执行 insert ,不会报错,而使用 update 会报错。

下面分离此对象的方式是无效的: db.expunge(item) return item

将此函数按照协程的方式 按照异步在调用时 await, 也是不能直接返回此对象。so 能将就就将就吧。


同样的方法在 curd_dept 中就没有此问题(可以直接返回 item)。见了鬼了。并不是因为 删除的还需要显示映射另一个表

问题的根本原因还是没有找到,不过解决方法是有很多的。最简单是删除前 copy 此对象就行

4 创建路由

# app/api/api_v1/endpoints/emp.py
from typing import List

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

from app import schemas
from app.api.dep import get_db
from app.curd import curd_emp

router = APIRouter()


@router.get('/query/{eno}/', response_model=schemas.TbEmp, name='查询单个员工')
async def query(eno: int, db: Session = Depends(get_db)):
    item = curd_emp.get_emp(db, eno)
    if not item:
        raise HTTPException(status_code=400, detail="没有此 员工编号")
    return item


@router.post('/del/{eno}/', response_model=schemas.TbEmp, name='删除单个员工')
async def del_eno(eno: int, db: Session = Depends(get_db)):
    return curd_emp.remove_emp(db, eno)


@router.get('/all/', response_model=List[schemas.TbEmp])
async def emp_all(db: Session = Depends(get_db)):
    return curd_emp.get_emp_all(db)

还有一个文件限于篇幅,就不贴上来了。操作都差不多

5 注册路由及启动程序

1 在 api_v1 进行归纳并注册

from fastapi import APIRouter

from app.api.api_v1.endpoints import emp, dept

api_router = APIRouter()
api_router.include_router(emp.router, prefix="/emp", tags=['emp'])
api_router.include_router(dept.router, prefix='/dept', tags=['dept'])

2 在 mian 文件注册 v1 以便访问,并配置跨域等与服务相关的东西(此处没有配置

from fastapi import FastAPI

from app.api.api_v1.api import api_router


def create_app():
    app = FastAPI()
    app.include_router(api_router, prefix='/v1')
    return app

tips: 注意此处并没像官网那样,因为项目不止包含此 app,同时还有 alembic 等于 api 无关的一些东西,导致进入项目还需要进行 cd ./app跳转来启动服务。

所以此处将 app 作为函数返回值,让外部接收。这样就可以直接在项目根目录启动了。

3 在 run.py 里面导入 app

from app.main import create_app

app = create_app()

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

4 启动程序进行测试 $ uvicorn run:app --reload


相关推荐:

来自系列:fastapi

分类 python下文章:

1.0 爬虫的介绍,和requests模块的简单使用

1.1 数据解析的三种方式。正则表达式, bs4, xpath

2.0 多任务(进程,协程,线程)爬虫:验证码识别,返回头储存,ip代理 介绍。 异步是什么,爬虫异步的方式。线程,进程,介绍

2.0.1 协程的 async/await 实现 爬虫 单线程 + 异步协程的实现

3.0 基于selenium 模块的 爬虫操作。 selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。

更多...

aid 加一试试->

评论([[comments.sum]])

发表

加载更多([[item.son.length - 2]])...

发表

2020-11 By Hchuan.

flask & bootstrap-flask

© 2021 HChuan. All rights reserved.

随机占位图来自:fghrsh

互联网ICP备案号:蜀ICP备2020031846号