Pydantic-resolve 使用手册 / Manual

Pydantic-resolve 是一个轻量级的工具库, 用来构建多层嵌套结构的数据。

它能通过上层的数据字段获取并关联下层的数据。 也能在获取数据之后执行post方法进行额外计算。

安装

pip install pydantic-resolve

source: https://github.com/allmonday/pydantic-resolve

我们用FastAPI应用场景来举例, Tasks 接口

这个场景的目的是生成一个 task -> comment -> feedback 的三层嵌套数据

假设我们现在有一个API接口, get_tasks

 1class Task(Base):
 2   __tablename__ = "task"
 3
 4   id: Mapped[int] = mapped_column(primary_key=True)
 5   name: Mapped[str]
 6
 7class TaskSchema(BaseModel):
 8   id: int
 9   name: str
10
11   class Config:
12      orm_mode = True
13
14@app.get('/tasks', response_model=List[TaskSchema])
15async def get_tasks(private:bool= Query(default=True),
16                  session: AsyncSession = Depends(db.get_session)):
17   tasks = (await session.execute(select(md.Task))).scalars().all()
18   return tasks

输出:

[
   { "id": 1, "name": "setup test environment" },
   { "id": 2, "name": "initial project" },
]

通过task_id查找 task 关联的 comments

现在我们需要为每个task 对象关联一些comments, 操作步骤

  1. 添加 dataloader, 传入需要task ids, 返回每个task id 关联的comments

  2. 在Task类型上添加 comments: List[Comment] = []

  3. 使用Resolver执行解析

pydantic-resolve 会自动处理orm对象到schema对象的转换。只要字段内容一致,并且符合pydantic的用法。

 1async def comment_batch_load_fn(task_ids):
 2   async with db.async_session() as session:
 3         res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
 4         rows = res.scalars().all()
 5         return build_list(rows, task_ids, lambda x: x.task_id)
 6
 7class Comment(Base):
 8   __tablename__ = "comment"
 9
10   id: Mapped[int] = mapped_column(primary_key=True)
11   task_id: Mapped[int] = mapped_column()
12   content: Mapped[str]
13
14class Task(Base):
15   __tablename__ = "task"
16
17   id: Mapped[int] = mapped_column(primary_key=True)
18   name: Mapped[str]
19
20class CommentSchema(BaseModel):
21   id: int
22   task_id: int
23   content: str
24
25   class Config:
26      orm_mode = True
27
28class TaskSchema(BaseModel):
29   id: int
30   name: str
31
32   comments: List[CommentSchema] = []
33   def resolve_comments(self, comment_loader=LoaderDepend(comment_batch_load_fn)):
34      return comment_loader.load(self.id)
35
36   class Config:
37      orm_mode = True
38
39@app.get('/tasks', response_model=List[TaskSchema])
40async def get_tasks(private:bool= Query(default=True),
41                  session: AsyncSession = Depends(db.get_session)):
42   tasks = (await session.execute(select(Task))).scalars().all()
43   tasks = await Resolver().resolve(tasks)
44   return tasks

输出:

[
   { "id": 1, "name": "setup test environment", "comments": [
      { "id": 1, "task_id": 1, "content": "remember to config pipeline" },
      { "id": 2, "task_id": 1, "content": "DBA is OOO" },
   ] },
   { "id": 2, "name": "initial project", "comments": [
      { "id": 3, "task_id": 2, "content": "I need authority" },
   ] },
]

通过 comment_id 为 comment 添加关联的 feedback

我们照着这个模式,继续为每个comment对象关联一些 feedbacks.

  1. 添加 dataloader, 传入需要comment ids, 返回每个comment id 关联的 feedbacks

  2. 在 Comment类型上添加 feedbacks: List[Feedback] = []

 1async def comment_batch_load_fn(task_ids):
 2   async with db.async_session() as session:
 3         res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
 4         rows = res.scalars().all()
 5         return build_list(rows, task_ids, lambda x: x.task_id)
 6
 7async def feedback_batch_load_fn(comment_ids):
 8    async with db.async_session() as session:
 9         res = await session.execute(select(Feedback)
10            .where(Feedback.comment_id.in_(comment_ids)))
11         rows = res.scalars().all()
12         return build_list(rows, comment_ids, lambda x: x.comment_id)
13
14class Task(Base):
15   __tablename__ = "task"
16
17   id: Mapped[int] = mapped_column(primary_key=True)
18   name: Mapped[str]
19
20class Comment(Base):
21   __tablename__ = "comment"
22
23   id: Mapped[int] = mapped_column(primary_key=True)
24   task_id: Mapped[int] = mapped_column()
25   content: Mapped[str]
26
27class Feedback(Base):
28   __tablename__ = "feedback"
29
30   id: Mapped[int] = mapped_column(primary_key=True)
31   comment_id: Mapped[int] = mapped_column()
32   content: Mapped[str]
33
34class FeedbackSchema(BaseModel):
35   id: int
36   comment_id: int
37   content: str
38
39   class Config:
40      orm_mode = True
41
42class CommentSchema(BaseModel):
43   id: int
44   task_id: int
45   content: str
46   feedbacks: List[FeedbackSchema] = []
47   def resolve_feedbacks(self, feedback_loader=LoaderDepend(feedback_batch_load_fn)):
48      return feedback_loader.load(self.id)
49
50   class Config:
51      orm_mode = True
52
53class TaskSchema(BaseModel):
54   id: int
55   name: str
56
57   comments: List[CommentSchema] = []
58   def resolve_comments(self, comment_loader=LoaderDepend(comment_batch_load_fn)):
59      return comment_loader.load(self.id)
60
61   class Config:
62      orm_mode = True
63
64@app.get('/tasks', response_model=List[TaskSchema])
65async def get_tasks(private:bool= Query(default=True),
66                  session: AsyncSession = Depends(db.get_session)):
67   tasks = (await session.execute(select(Task))).scalars().all()
68   tasks = await Resolver().resolve(tasks)
69   return tasks

输出:

[
   { "id": 1, "name": "setup test environment", "comments": [
      { "id": 1, "task_id": 1, "content": "remember to config pipeline", "feedbacks": [
         { "id": 1, "comment_id": 1, "content": "roger"},
         { "id": 2, "comment_id": 1, "content": "done"},
      ] },
      { "id": 2, "task_id": 1, "content": "DBA is OOO", "feedbacks": [] },
   ] },
   { "id": 2, "name": "initial project", "comments": [
      { "id": 3, "task_id": 2, "content": "I need authority", "feedbacks": [
         { "id": 3, "comment_id": 3, "content": "received"},
         { "id": 4, "comment_id": 3, "content": "granted"},
      ] },
   ] },
]

Attention

从代码上我们能看到,所有的额外关联,都没有对代码的侵入和改动。

其他使用方法:

更多: