Skip to main content

8.4.2 异步编程与并发调用

本节定位

做 LLM 应用时,很多人第一次的性能瓶颈不是模型不够强,而是:

系统大部分时间都在等。

等接口、等检索、等工具、等数据库。 异步编程就是在解决这种“CPU 没在忙,但任务还卡着”的问题。

学习目标

  • 理解为什么 LLM 应用天然适合异步并发
  • 分清同步调用和异步调用的区别
  • 学会 async / await / gather 的基本用法
  • 理解并发限制和超时控制为什么重要
  • 看懂一个更贴近真实场景的异步调用示例

新人术语桥

在读代码前,先把工程里经常出现的几个词捋清楚会更稳:

术语在本节里的意思为什么重要
I/OInput / Output,输入输出,比如网络请求、数据库查询、文件读取、API 调用这些步骤很多时间都在等待,而不是计算
coroutine协程,可以在 await 处暂停,之后再继续执行它让 Python 等一个任务时,可以先推进别的等待任务
scheduler调度器,事件循环中决定哪个协程继续运行的部分可以把它理解成异步并发里的“交通指挥员”
Semaphore信号量,用来限制同一时间最多运行多少个任务防止应用把 API、数据库或模型服务一下子打爆
timeout超时时间,也就是一个操作最多等多久防止某个上游调用卡住后拖垮整条请求链路

新人可以先这样理解:异步代码不是让某个外部服务本身变快,而是让应用不要把时间浪费在干等上。


先建立一张地图

异步编程更适合按“哪里在等、能不能并发、哪里要限流”来理解:

所以这节真正想解决的是:

  • 为什么 LLM 工程的性能问题常常不是算力,而是等待
  • 为什么异步不是魔法提速,而是更聪明地利用等待时间

为什么 LLM 工程特别容易遇到“等待”?

一个真实得不能再真实的场景

你做一个问答助手,一次请求可能要:

  1. 查知识库
  2. 调模型
  3. 再调一个工具

如果每一步都顺序等完再做下一步,整体延迟很容易拉长。

关键点:很多步骤不是“计算慢”,而是“等待慢”

例如:

  • 网络请求
  • 数据库查询
  • 第三方 API

这些阶段,CPU 很多时候并没有真正忙满。 这就意味着:

可以在等待一个任务的时候,先去做别的任务。

这正是异步编程最有价值的地方。

一个更适合新人的总类比

你可以把异步编程理解成:

  • 一边烧水,一边切菜

如果你烧水时只是站在锅边发呆, 那很多时间其实被浪费了。 而异步就是在说:

  • 等待期间,先去推进别的任务

这个类比很适合新人,因为它会帮助你先抓住:

  • 异步不是让单个请求“更强”
  • 而是让整体等待“更聪明”

同步和异步到底差在哪?

同步:一个任务做完再做下一个

import time

def task(name, delay):
time.sleep(delay)
return f"{name} done"

start = time.time()
print(task("A", 1))
print(task("B", 1))
print("elapsed =", round(time.time() - start, 2))

这段代码会大约花 2 秒。

示例输出:

A done
B done
elapsed = 2.0

异步:发出去后先别傻等

import asyncio
import time

async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"

async def main():
start = time.time()
results = await asyncio.gather(
task("A", 1),
task("B", 1)
)
print(results)
print("elapsed =", round(time.time() - start, 2))

asyncio.run(main())

这一版通常只要大约 1 秒。

示例输出:

['A done', 'B done']
elapsed = 1.0

真正的差别是什么?

不是“异步更神秘”,而是:

等待期间,调度器不会傻站着,而会去推进别的协程。


asyncawait 到底在表达什么?

async def

表示:

这是一个协程函数。

它不会立刻像普通函数那样直接完成,而是可以被调度执行。

await

表示:

这里需要等一个异步结果回来。

但等的这段时间,调度器可以去处理别的协程。

一个最容易理解的类比

同步像:

  • 做饭时站在锅前傻等水烧开

异步像:

  • 水在烧时,你先去切菜

gather 为什么这么常见?

因为很多 LLM 场景天然就是“并发查几路”

例如:

  • 同时调 3 个检索器
  • 同时请求多个模型候选
  • 同时查几个数据源

这时 asyncio.gather() 很自然。

一个更贴近 LLM 场景的示例

import asyncio

async def retrieve_docs():
await asyncio.sleep(0.3)
return ["退款政策", "证书说明"]

async def call_model():
await asyncio.sleep(0.5)
return "模型初步回复"

async def fetch_user_profile():
await asyncio.sleep(0.2)
return {"user_level": "beginner"}

async def main():
docs, model_reply, profile = await asyncio.gather(
retrieve_docs(),
call_model(),
fetch_user_profile()
)
print(docs)
print(model_reply)
print(profile)

asyncio.run(main())

预期输出:

['退款政策', '证书说明']
模型初步回复
{'user_level': 'beginner'}

这就已经非常像真实应用里“并行查几层信息”的写法了。


为什么不能无限并发?

因为外部系统不是无限扛得住

如果你一口气并发 1000 个请求,可能会遇到:

  • API 限流
  • 数据库被打爆
  • 文件句柄耗尽
  • 上游服务超时

所以异步编程不是“并发越多越好”,而是:

要在吞吐和稳定性之间找平衡。

Semaphore 做并发限制

import asyncio

semaphore = asyncio.Semaphore(3)

async def limited_task(i):
async with semaphore:
await asyncio.sleep(0.2)
return f"task_{i}"

async def main():
results = await asyncio.gather(*(limited_task(i) for i in range(10)))
print(results)

asyncio.run(main())

预期输出:

['task_0', 'task_1', 'task_2', 'task_3', 'task_4', 'task_5', 'task_6', 'task_7', 'task_8', 'task_9']

这个例子表示:

  • 虽然一共发起了 10 个任务
  • 但同一时刻最多只允许 3 个一起跑

一个很适合初学者先记的判断表

现象更值得先怎么处理
请求很多但主要卡在 I/O先考虑并发
外部服务开始报限流先加 Semaphore
某些请求一直挂住先加 timeout
单个任务本身就算得很重异步不一定是第一解

这个表很适合新人,因为它会把“什么时候该上异步、什么时候该限流”重新变成几个具体判断。

异步并发、Semaphore 与 timeout 控制图

读图提示

异步不是无限并发。图里 gather 负责并发等待,Semaphore 负责限流,timeout 负责不让请求卡死,这三者合在一起才更像真实工程。


超时控制为什么特别重要?

因为有些请求会“卡死”

真实系统里,如果一个上游服务慢到离谱,而你又没有超时控制,整个请求就可能一直挂住。

一个最小超时示例

import asyncio

async def slow_task():
await asyncio.sleep(2)
return "done"

async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=0.5)
print(result)
except asyncio.TimeoutError:
print("task timeout")

asyncio.run(main())

预期输出:

task timeout

这在工程里非常关键,因为“无限等待”通常比“明确失败”更糟。


异步编程在 LLM 工程里的典型使用点

检索并发

同时查:

  • FAQ
  • 向量库
  • 数据库

多模型并发

例如:

  • 主模型 + 备用模型
  • 多候选答案并发生成

工具并发

比如一个 Agent 要同时:

  • 查天气
  • 查用户状态
  • 查订单记录

日志与监控链路

有些日志和上报也适合异步做,避免堵住主请求。

第一次把异步放进项目里,最稳的默认顺序

更稳的顺序通常是:

  1. 先找出哪些步骤主要在等 I/O
  2. 先把这些步骤并发起来
  3. 再加 Semaphore 控制并发数
  4. 最后补超时和异常处理

这样会比一上来就把整个项目全改成异步更稳。


如果你的目标是“知识库驱动的课件生成助手”,哪些步骤最值得并发?

这类项目里,最容易并发起来的通常不是“最终生成课件”这一步, 而是生成前的几个外部等待动作。

更值得优先考虑并发的通常是:

  • 查内部知识库
  • 补外部资料
  • 读用户画像或配置
  • 预取模板信息

你可以先把它理解成:

并发最值钱的地方,往往是在“收集上下文”阶段。

一个更像真实系统的小例子

import asyncio

async def search_kb(query):
await asyncio.sleep(0.3)
return f"知识库结果: {query}"

async def get_user_status(user_id):
await asyncio.sleep(0.2)
return {"user_id": user_id, "progress": 0.15}

async def call_llm(prompt):
await asyncio.sleep(0.4)
return f"LLM 回复: {prompt}"

async def handle_request(query, user_id):
kb_result, user_status = await asyncio.gather(
search_kb(query),
get_user_status(user_id)
)

prompt = f"请根据以下信息回答:{kb_result},用户状态:{user_status}"
answer = await call_llm(prompt)
return answer

print(asyncio.run(handle_request("退款政策是什么", 1)))

预期输出:

LLM 回复: 请根据以下信息回答:知识库结果: 退款政策是什么,用户状态:{'user_id': 1, 'progress': 0.15}

异步上下文收集运行结果图

读图提示

从上方两条并行线往下读:search_kb()get_user_status() 同时等待,gather 合并两个输出,然后 call_llm() 才使用组合后的上下文。

这个例子已经很像真实后端:

  • 前半段并发取上下文
  • 后半段再统一送给模型

初学者最常踩的坑

把异步理解成“更快的同步”

异步不是加速魔法,它更像是更聪明的等待方式。

一上来就无限并发

这很容易把系统压坏。

没有超时和异常处理

一旦某个任务卡死,整个请求链路就可能拖垮。

如果把它做成项目或系统设计,最值得展示什么

最值得展示的通常不是:

  • “我用了 asyncio”

而是:

  1. 哪些步骤被并发了
  2. 为什么这里值得并发
  3. 限流和超时是怎么设计的
  4. 整体延迟是怎么降下来的

这样别人会更容易看出:

  • 你理解的是异步并发的工程价值
  • 不只是会写语法

小结

这一节最重要的不是背 async / await 语法,而是理解:

异步编程的核心,是把“等待时间”利用起来,让系统在 I/O 密集型场景下更高效、更稳定。

这在 LLM 工程里几乎是绕不开的基本功。


练习

  1. 把本节的并发示例里任务数从 10 增加到 30,并调整 Semaphore 的大小。
  2. handle_request() 里再加一个并发工具调用。
  3. 想一想:为什么异步编程特别适合“多外部依赖”的 LLM 应用?
  4. 用自己的话解释:异步编程为什么不是“让单个任务更快”,而是“让整体等待更聪明”?