Pydantic-resolve 使用手册 / Manual¶
Pydantic-resolve 是一个轻量级的工具库, 用来构建多层嵌套结构的数据。
它能通过上层的数据字段获取并关联下层的数据。 也能在获取数据之后执行post方法进行额外计算。
安装¶
pip install pydantic-resolve
我们用FastAPI应用场景来举例, Tasks 接口¶
这个场景的目的是生成一个 task -> comment -> feedback 的三层嵌套数据
假设我们现在有一个API接口, get_tasks
1class Task(Base):
2 __tablename__ = "task"
3
4 id: Mapped[int] = mapped_column(primary_key=True)
5 name: Mapped[str]
6
7class TaskSchema(BaseModel):
8 id: int
9 name: str
10
11 class Config:
12 orm_mode = True
13
14@app.get('/tasks', response_model=List[TaskSchema])
15async def get_tasks(private:bool= Query(default=True),
16 session: AsyncSession = Depends(db.get_session)):
17 tasks = (await session.execute(select(md.Task))).scalars().all()
18 return tasks
输出:
[
{ "id": 1, "name": "setup test environment" },
{ "id": 2, "name": "initial project" },
]
通过task_id查找 task 关联的 comments¶
现在我们需要为每个task 对象关联一些comments, 操作步骤
添加 dataloader, 传入需要task ids, 返回每个task id 关联的comments
在Task类型上添加 comments: List[Comment] = []
使用Resolver执行解析
pydantic-resolve 会自动处理orm对象到schema对象的转换。只要字段内容一致,并且符合pydantic的用法。
1async def comment_batch_load_fn(task_ids):
2 async with db.async_session() as session:
3 res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
4 rows = res.scalars().all()
5 return build_list(rows, task_ids, lambda x: x.task_id)
6
7class Comment(Base):
8 __tablename__ = "comment"
9
10 id: Mapped[int] = mapped_column(primary_key=True)
11 task_id: Mapped[int] = mapped_column()
12 content: Mapped[str]
13
14class Task(Base):
15 __tablename__ = "task"
16
17 id: Mapped[int] = mapped_column(primary_key=True)
18 name: Mapped[str]
19
20class CommentSchema(BaseModel):
21 id: int
22 task_id: int
23 content: str
24
25 class Config:
26 orm_mode = True
27
28class TaskSchema(BaseModel):
29 id: int
30 name: str
31
32 comments: List[CommentSchema] = []
33 def resolve_comments(self, comment_loader=LoaderDepend(comment_batch_load_fn)):
34 return comment_loader.load(self.id)
35
36 class Config:
37 orm_mode = True
38
39@app.get('/tasks', response_model=List[TaskSchema])
40async def get_tasks(private:bool= Query(default=True),
41 session: AsyncSession = Depends(db.get_session)):
42 tasks = (await session.execute(select(Task))).scalars().all()
43 tasks = await Resolver().resolve(tasks)
44 return tasks
输出:
[
{ "id": 1, "name": "setup test environment", "comments": [
{ "id": 1, "task_id": 1, "content": "remember to config pipeline" },
{ "id": 2, "task_id": 1, "content": "DBA is OOO" },
] },
{ "id": 2, "name": "initial project", "comments": [
{ "id": 3, "task_id": 2, "content": "I need authority" },
] },
]
通过 comment_id 为 comment 添加关联的 feedback¶
我们照着这个模式,继续为每个comment对象关联一些 feedbacks.
添加 dataloader, 传入需要comment ids, 返回每个comment id 关联的 feedbacks
在 Comment类型上添加 feedbacks: List[Feedback] = []
输出:
Attention
从代码上我们能看到,所有的额外关联,都没有对代码的侵入和改动。