E.B.3 並行プログラミング(asyncio を含む)


並行処理は、プログラムの多くの時間が「待ち」であるときに役立ちます。HTTP 呼び出し、DB 呼び出し、ファイル I/O、スクレイピング、RAG 検索、Agent のツール呼び出しなどです。CPU が重い処理を魔法のように速くするものではありません。
準備するもの
- Python 3.10+
- 外部パッケージ不要
pythonを実行できるターミナル
重要用語
- I/O-bound(I/O 待ち中心):大半の時間を外部システム待ちに使う処理。
- CPU-bound(CPU 計算中心):大半の時間を計算に使う処理。
- Coroutine(コルーチン):
awaitで一時停止できる非同期関数。 asyncio.gather:複数の awaitable を実行し、結果を集める。- Semaphore(セマフォ):同時に動くタスク数を制限する。
- Timeout(タイムアウト):一定時間を超えたら待つのをやめる。
制御付き非同期 batch を動かす
async_batch.py を作成します。
import asyncio
async def call_tool(name, delay):
await asyncio.sleep(delay)
return f"{name}:ok"
async def guarded_call(semaphore, name, delay, timeout):
async with semaphore:
try:
return await asyncio.wait_for(call_tool(name, delay), timeout=timeout)
except asyncio.TimeoutError:
return f"{name}:timeout"
async def main():
semaphore = asyncio.Semaphore(2)
results = await asyncio.gather(
guarded_call(semaphore, "search", 0.1, 0.5),
guarded_call(semaphore, "database", 0.2, 0.5),
guarded_call(semaphore, "slow_tool", 1.0, 0.3),
)
print(results)
asyncio.run(main())
実行します。
python async_batch.py
期待される出力:
['search:ok', 'database:ok', 'slow_tool:timeout']
大切なのは gather だけではありません。gather、並行数の上限、タイムアウト処理を組み合わせることです。
上限を変える
この小さな確認コードで、2つの上限を見てみます。
import asyncio
for limit in [2, 1]:
semaphore = asyncio.Semaphore(limit)
print("limit:", limit, "semaphore:", type(semaphore).__name__)
期待される出力:
limit: 2 semaphore: Semaphore
limit: 1 semaphore: Semaphore
最終結果は同じですが、タスクはより保守的に実行されます。実サービスでは、これにより上流 API を急なリクエストから守れます。
asyncio を使う場面
向いているもの:
- 多数のネットワークリクエスト
- 複数のツール呼び出し
- 複数ソースからの RAG 検索
- DB やキュー待ち
最初の選択肢にしにくいもの:
- 重い数値計算
- 大きな画像変換
- 待ち時間のボトルネックがなく、単純さを優先したいコード
よくある間違い
- I/O-bound か確認せず、どこにでも
asyncを付ける。 - 並行数上限なしで
gatherを使う。 - タイムアウトを忘れ、遅い上流一つで全体が詰まる。
- 例外を握りつぶし、どのタスクが失敗したか記録しない。
練習
ツール呼び出しをさらに5つ追加し、Semaphore(3) にします。その後、タイムアウトを 0.15 に下げ、いくつが :timeout になるか数えてください。