评论([[comments.sum]])
FastAPI可与任何数据库和任何样式的库一起使用,以与数据库进行通信。
一种常见的模式是使用 ORM :“对象关系映射”库。
数据库连接使用有多种映射模型( ORM.) 如 Peewee, Django-ORM, SQLAIchemy...
本文主要介绍 AQLAIchemy 的连接配置使用
文件结构
这里我们将链接到数据库,并初始化一些配置
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pymysql
pymysql.install_as_MySQLdb()
# 指定路径
# SQLALCHEMY_DATABASE_URL = "mysql:///./hrs.sql"
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:xxxxxx@192.168.3.33:3306/comment_test"
# 建一个SQLAlchemy engine“引擎”
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
# connect_args={"check_same_thread": False} # 仅适用于 AQLite
)
# 建立本地会话 类,该类的每个 实例都是一个数据库会话。
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建 Base类, 将用来创建每个数据库的模型MOdel (一般都是一个类
# 真正的项目中一般都是将此对象,抽离成一个单独的文件,同时定义一些 公共字段
Base = declarative_base()
这里是建设 SQLAIchemy 与数据库的映射关系
# models.oy
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
# 创建了 两个表的ORM 映射模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
# 官网的 演示是没有声明长度的。而我使用的 maridb必须声明 String(16)
email = Column(String(16), unique=True, index=True)
hashed_password = Column(String(128))
is_active = Column(Boolean, default=True)
# 关联两个表,SQLAIchemy 会自动按照外键约束处理。flask 中只用定义一个一般在表关系中 多 的那一个表
# 四天后更新,主要原因是使用了 back_populates, 使用 backerf则只需要再一边定义就行
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(128), index=True)
description = Column(String(256), index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
为了避免 SQLAlchemy 模型和 Pydantic 模型之间的混淆,使用带有 Pydantic 模型的 schemas.py( 来自于 SQLAIchemy 模型的映射)文件, 而非是直接使用 数据库的映射文件。
同时也可以 细致定义一个表的某个或多个字段为返回值,而非是 整个表的全部字段
同时也避免了在使用两者时产生混淆
简单理解就是:将 ORM 映射表拆分成不同的字段进行拆分,以供路由 请求参数,及返回参数 的验证(筛选。
如:创建一个 user 只需要 emial 和 password, 而不需要 is_active ,所以对请求体使用 类 UserCreate
。而创建成功则需要返回整个用户,但不需要返回 password 所以使用类User
此类继承自 UserBase
不具有参数 password.
# schemas.py
from typing import List, Optional
from pydantic import BaseModel
# 注意属性的名字,是和ORM的属性名一样的
# 创建一个ItemBase和UserBasePydantic模型(或称“方案”)以在创建或读取数据时具有共同的属性。
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
# 创建一个ItemCreate和并UserCreate从它们继承(这样它们将具有相同的属性),以及创建所需的所有其他数据(属性)。
class ItemCreate(ItemBase):
pass
# 创建Pydantic模型/模式以进行读取/返回, Item, User
class Item(ItemBase):
id: int
owner_id: int
# **tips: 外部
class Config:
# 将要做为 限制响应体 数据库ORM的数据时,请务必开启此配置
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
tips: 此类 Config 用于为 Pydantic 提供配置。
orm_mode = True
允许使用 ORM 模式: 可以将任意的类实例创建 Pydantic 模型,支持映射到 ORM 对象的模型。(白话就是可以将可以使用此类的一个方法(User.from_orm(BaseModel.User())
,直接将 ORM 的数据转为 Pydantic 模型的数据)例子: 内部中对于传输数据的取值假如是这样的
User(objName)
,内部默认的取值方式:
id = objName['id']
.因为设置了值为 True, 所以他还会尝试
id = ObjName.id
数据库ORM映射模型时
。请必须设置 ore_mode
属性。否则将会因为不能解析数据而报错对于大部分情况,我们不会将对数据库的交互(增删查改),直接放在路由函数中。而是应该单独抽离出来,以便于以后的修改,及减少代码的重复量...
# curd.py
# 导入此参数,便于声明函数参数类型(已得到 语法提示,及 python声明形参类型的一些特性
from sqlalchemy.orm import Session
# models 交互数据库,schems声明证形参类型
from . import models, schemas
# 创建对数据库操作函数(类似原子类, 以减少路由操作函数中 对数据库操作的重复代码
def get_user(db: Session, user_id: int):
# 查询此 id的用户
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
# 查询此邮箱的用户
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
# 分页查询数据库
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
# 添加用户
# schemas.UserCreate 使用这个类来筛选参数?? python本身不会对声明了参数类型的参数做验证
# 同时本身这个参数是由外部传入路由过来的对象,本身就能保证参数的正确
# 经过简单的验证 python 对于声明了类型的形参不会做出验证
fake_hashed_password = user.password + "notreallyhashed"
# 获得 模型实例
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
# 添加到数据库会话中
db.add(db_user)
# 提交此处操作
db.commit()
# 刷新模型实例,以获得 数据库返回的 id等字段
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
# mian.py
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
# 在主文件使用 . 相对路径导入 文件总是报错,所以此处使用绝对路径
# from . import crud, models, schemas
from spL_app import models, schemas, crud
# 导入 会话 及 引擎
from spL_app.database import SessionLocal, engine
# 按照创建的 ORM模型,创建数据库表(存在则不创建,当然一般都是 使用数据库 迁移工具进行数据库的管理,此处仅为演示
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency 依赖,处理数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 下面专门 说明
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
在此处每个路由函数仅相当于对 需要的操作按照条件,转发至相应的 crud 中的函数中,并没有对数据库进行直接操作
# 定义返回体类型
@app.post("/users/", response_model=schemas.User)
# 接收 UserCreate中定义的字段 email,password 封装入 user, 同时使用 依赖,注入了一个 db 对象
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
# 查询到数据库中 已经存在了 此user ,直接返回错误 400
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
response_model 声明了和限制了 响应体的数据 :
基本在各种类型 路由函数装饰器上都可以使用此属性( get, post, ...
可以的接收参数: Paydantic 模型, Paydantic 模型的列表( List[item])
db , 来自 Dependes(get_db) 依赖 注入的 数据库, 他将传递给相应的 crud 文件中的函数进行相应操作
其实从头慢慢看下来,还是很简单的。就是有很多细节。所以 简单按步骤总结下
database 建立数据库的连接,同时初始化了一些类: engine (连接数据库, SessionLocal (建立一次会话, Base(用于建立 ORM 模型
models 建立数据库的映射模型,(可以手动创建然后再数据库中生成。或者使用 迁移工具从数据库映射到本地
schemas.py 创建 Pydantic 模型, 此模型是根据 数据库的 ORM 映射而来(注:再程序里本身并无继承或者引用 models 的内容)
orm_mode = True
curd.py 将对数据库的操作代码抽离出来,作为一个个的函数(这些函数只进行操作,不触及连接数据库及路由错误响应)
main.py 路由文件(实际操作中,此文件 应该只用来进行 初始化,再将一些路由,依赖都抽离出去
alembic 是 sqlalchemy 的作者开发的。用来做 OMR 模型与数据库的迁移与映射。alembic 使用方式跟 git 有点类似.
Alembic 使用 SQLAlchemy 作为数据库引擎,为关系型数据提供创建、管理、更改和调用的管理脚本,协助开发和运维人员在系统上线后对数据库进行在线管理。¶
1 安装
pip install alembic
2 创建迁移环境
alembic init alembic
生成了的文件目录如下:
3 配置 env.py 文件:
导入数据库的 OMR
定义 数据库的连接
修改 env.py 配置文件,使用定义的数据库连接
# 省略了没有修改的代码,毕竟是在原基础上进行修改
# ……
import os
import sys
# 将上级目录添加 导入(import的搜索路径中,否则会找不到 包 spL_app. 迁移工具的运行位置为此文件当前目录
sys.path.append(os.path.dirname(os.pardir))
# 注意此处导入 Base的位置,如果使用 database里导入,则会因为检测不到ORM发生 错误
# 一般项目中的做方法是 专门用一个文件导入既定ORM类和Base 供此和其他地方导入
# from spL_app.database import Base
from spL_app.models import Base
target_metadata = Base.metadata
#...
# 创建此函数以便于方便的修改 url(数据库地址, 如果直接将 url放在alembic.ini 中,则此处及下面都可以不用修改
def get_url():
# 一般的项目中:是将这些配置放在配置文件中,然后导入至此
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:xxxxx@192.168.3.33:3306/comment_test"
return SQLALCHEMY_DATABASE_URL
# 修改自动生成的两个函数,以 覆盖alembic.ini 中定义的 url.
def run_migrations_offline():
# url = config.get_main_option("sqlalchemy.url")
url = get_url()
context.configure(
url=url,
...
)
# ...
def run_migrations_online():
# 增加三条语句和删除一条: 目的是 将配置文件中 url替换
configuration = config.get_section(config.config_ini_section) # 1
configuration['sqlalchemy.url'] = get_url() # 2
connectable = engine_from_config(
# config.get_section(config.config_ini_section), #del
configuration, # 3
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
# ...
sys.path.append:
python 程序中使用 import XXX 时, python 解析器会在当前目录、已安装和第三方模块中搜索 xxx ,如果都搜索不到就会报错.
使用 sys.path.append()方法可以临时添加搜索路径,方便更简洁的 import 其他包和模块。这种方法导入的路径会在 python 程序退出后失效。
4 生成迁移的脚本文件
alembic revision --autogenerate -m "msg"
5 提交此次迁移至数据库
alembic upgrade head
此后每次想要修改数据库表,在 ORM 的类里面修改后,进行 4 , 5 的操作就行了,注意每次生成的迁移文件必须提交,否则会因为版本的不同,而发生错误。
Tips: 这些配置都是基于上面的演示项目而来,所以在很多地方,都不是特别合理。
同时上面 main.py 里面的
models.Base.metadata.create_all(bind=engine)
这条代码,也可以删除不用了
下一篇,将介绍 如何将数据库的表映射至本地,并自动生成 ORM 文件
相关推荐:
来自系列:fastapi评论([[comments.sum]])
[[item.name]] [[item.email]] [[item.date_time]]
[[itemSon.name]] [[itemSon.email]] [[itemSon.date_time]]
回复 @[[itemSon.reply_user.name]]:
加载更多([[item.son.length-2]])...