跳到主要内容

异步编程与并发调用

本节定位

做 LLM 应用时,很多人第一次的性能瓶颈不是模型不够强,而是:

系统大部分时间都在等。

等接口、等检索、等工具、等数据库。
异步编程就是在解决这种“CPU 没在忙,但任务还卡着”的问题。

学习目标

  • 理解为什么 LLM 应用天然适合异步并发
  • 分清同步调用和异步调用的区别
  • 学会 async / await / gather 的基本用法
  • 理解并发限制和超时控制为什么重要
  • 看懂一个更贴近真实场景的异步调用示例

先建立一张地图

异步编程更适合按“哪里在等、能不能并发、哪里要限流”来理解:

所以这节真正想解决的是:

  • 为什么 LLM 工程的性能问题常常不是算力,而是等待
  • 为什么异步不是魔法提速,而是更聪明地利用等待时间

一、为什么 LLM 工程特别容易遇到“等待”?

1.1 一个真实得不能再真实的场景

你做一个问答助手,一次请求可能要:

  1. 查知识库
  2. 调模型
  3. 再调一个工具

如果每一步都顺序等完再做下一步,整体延迟很容易拉长。

1.2 关键点:很多步骤不是“计算慢”,而是“等待慢”

例如:

  • 网络请求
  • 数据库查询
  • 第三方 API

这些阶段,CPU 很多时候并没有真正忙满。
这就意味着:

可以在等待一个任务的时候,先去做别的任务。

这正是异步编程最有价值的地方。

1.3 一个更适合新人的总类比

你可以把异步编程理解成:

  • 一边烧水,一边切菜

如果你烧水时只是站在锅边发呆,
那很多时间其实被浪费了。
而异步就是在说:

  • 等待期间,先去推进别的任务

这个类比很适合新人,因为它会帮助你先抓住:

  • 异步不是让单个请求“更强”
  • 而是让整体等待“更聪明”

二、同步和异步到底差在哪?

2.1 同步:一个任务做完再做下一个

import time

def task(name, delay):
time.sleep(delay)
return f"{name} done"

start = time.time()
print(task("A", 1))
print(task("B", 1))
print("elapsed =", round(time.time() - start, 2))

这段代码会大约花 2 秒。

2.2 异步:发出去后先别傻等

import asyncio
import time

async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"

async def main():
start = time.time()
results = await asyncio.gather(
task("A", 1),
task("B", 1)
)
print(results)
print("elapsed =", round(time.time() - start, 2))

asyncio.run(main())

这一版通常只要大约 1 秒。

2.3 真正的差别是什么?

不是“异步更神秘”,而是:

等待期间,调度器不会傻站着,而会去推进别的协程。


三、asyncawait 到底在表达什么?

3.1 async def

表示:

这是一个协程函数。

它不会立刻像普通函数那样直接完成,而是可以被调度执行。

3.2 await

表示:

这里需要等一个异步结果回来。

但等的这段时间,调度器可以去处理别的协程。

3.3 一个最容易理解的类比

同步像:

  • 做饭时站在锅前傻等水烧开

异步像:

  • 水在烧时,你先去切菜

四、gather 为什么这么常见?

4.1 因为很多 LLM 场景天然就是“并发查几路”

例如:

  • 同时调 3 个检索器
  • 同时请求多个模型候选
  • 同时查几个数据源

这时 asyncio.gather() 很自然。

4.2 一个更贴近 LLM 场景的示例

import asyncio

async def retrieve_docs():
await asyncio.sleep(0.3)
return ["退款政策", "证书说明"]

async def call_model():
await asyncio.sleep(0.5)
return "模型初步回复"

async def fetch_user_profile():
await asyncio.sleep(0.2)
return {"user_level": "beginner"}

async def main():
docs, model_reply, profile = await asyncio.gather(
retrieve_docs(),
call_model(),
fetch_user_profile()
)
print(docs)
print(model_reply)
print(profile)

asyncio.run(main())

这就已经非常像真实应用里“并行查几层信息”的写法了。


五、为什么不能无限并发?

5.1 因为外部系统不是无限扛得住

如果你一口气并发 1000 个请求,可能会遇到:

  • API 限流
  • 数据库被打爆
  • 文件句柄耗尽
  • 上游服务超时

所以异步编程不是“并发越多越好”,而是:

要在吞吐和稳定性之间找平衡。

5.2 用 Semaphore 做并发限制

import asyncio

semaphore = asyncio.Semaphore(3)

async def limited_task(i):
async with semaphore:
await asyncio.sleep(0.2)
return f"task_{i}"

async def main():
results = await asyncio.gather(*(limited_task(i) for i in range(10)))
print(results)

asyncio.run(main())

这个例子表示:

  • 虽然一共发起了 10 个任务
  • 但同一时刻最多只允许 3 个一起跑

5.3 一个很适合初学者先记的判断表

现象更值得先怎么处理
请求很多但主要卡在 I/O先考虑并发
外部服务开始报限流先加 Semaphore
某些请求一直挂住先加 timeout
单个任务本身就算得很重异步不一定是第一解

这个表很适合新人,因为它会把“什么时候该上异步、什么时候该限流”重新变成几个具体判断。


六、超时控制为什么特别重要?

6.1 因为有些请求会“卡死”

真实系统里,如果一个上游服务慢到离谱,而你又没有超时控制,整个请求就可能一直挂住。

6.2 一个最小超时示例

import asyncio

async def slow_task():
await asyncio.sleep(2)
return "done"

async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=0.5)
print(result)
except asyncio.TimeoutError:
print("task timeout")

asyncio.run(main())

这在工程里非常关键,因为“无限等待”通常比“明确失败”更糟。


七、异步编程在 LLM 工程里的典型使用点

7.1 检索并发

同时查:

  • FAQ
  • 向量库
  • 数据库

7.2 多模型并发

例如:

  • 主模型 + 备用模型
  • 多候选答案并发生成

7.3 工具并发

比如一个 Agent 要同时:

  • 查天气
  • 查用户状态
  • 查订单记录

7.4 日志与监控链路

有些日志和上报也适合异步做,避免堵住主请求。

7.5 第一次把异步放进项目里,最稳的默认顺序

更稳的顺序通常是:

  1. 先找出哪些步骤主要在等 I/O
  2. 先把这些步骤并发起来
  3. 再加 Semaphore 控制并发数
  4. 最后补超时和异常处理

这样会比一上来就把整个项目全改成异步更稳。


八、如果你的目标是“知识库驱动的课件生成助手”,哪些步骤最值得并发?

这类项目里,最容易并发起来的通常不是“最终生成课件”这一步,
而是生成前的几个外部等待动作。

更值得优先考虑并发的通常是:

  • 查内部知识库
  • 补外部资料
  • 读用户画像或配置
  • 预取模板信息

你可以先把它理解成:

并发最值钱的地方,往往是在“收集上下文”阶段。

九、一个更像真实系统的小例子

import asyncio

async def search_kb(query):
await asyncio.sleep(0.3)
return f"知识库结果: {query}"

async def get_user_status(user_id):
await asyncio.sleep(0.2)
return {"user_id": user_id, "progress": 0.15}

async def call_llm(prompt):
await asyncio.sleep(0.4)
return f"LLM 回复: {prompt}"

async def handle_request(query, user_id):
kb_result, user_status = await asyncio.gather(
search_kb(query),
get_user_status(user_id)
)

prompt = f"请根据以下信息回答:{kb_result},用户状态:{user_status}"
answer = await call_llm(prompt)
return answer

print(asyncio.run(handle_request("退款政策是什么", 1)))

这个例子已经很像真实后端:

  • 前半段并发取上下文
  • 后半段再统一送给模型

十、初学者最常踩的坑

10.1 把异步理解成“更快的同步”

异步不是加速魔法,它更像是更聪明的等待方式。

10.2 一上来就无限并发

这很容易把系统压坏。

10.3 没有超时和异常处理

一旦某个任务卡死,整个请求链路就可能拖垮。

如果把它做成项目或系统设计,最值得展示什么

最值得展示的通常不是:

  • “我用了 asyncio”

而是:

  1. 哪些步骤被并发了
  2. 为什么这里值得并发
  3. 限流和超时是怎么设计的
  4. 整体延迟是怎么降下来的

这样别人会更容易看出:

  • 你理解的是异步并发的工程价值
  • 不只是会写语法

小结

这一节最重要的不是背 async / await 语法,而是理解:

异步编程的核心,是把“等待时间”利用起来,让系统在 I/O 密集型场景下更高效、更稳定。

这在 LLM 工程里几乎是绕不开的基本功。


练习

  1. 把本节的并发示例里任务数从 10 增加到 30,并调整 Semaphore 的大小。
  2. handle_request() 里再加一个并发工具调用。
  3. 想一想:为什么异步编程特别适合“多外部依赖”的 LLM 应用?
  4. 用自己的话解释:异步编程为什么不是“让单个任务更快”,而是“让整体等待更聪明”?