はじめに
Pythonの asyncio は、非同期I/O処理を実現する標準ライブラリです。ネットワーク通信やファイルI/Oなど、待ち時間(I/Oバウンド)の多い処理を効率的に並行実行できます。
本記事では、async/await 構文の基本からタスクの並行実行、実践的な非同期HTTPリクエストまでを解説します。
同期 vs 非同期
同期処理の問題
import time
def fetch_data(url, delay):
print(f"Fetching {url}...")
time.sleep(delay) # I/O待ちをシミュレート
print(f"Done {url}")
return f"data from {url}"
# 逐次実行: 合計6秒
start = time.time()
fetch_data("api/users", 2)
fetch_data("api/posts", 3)
fetch_data("api/comments", 1)
print(f"Total: {time.time() - start:.1f}s") # 約6秒
非同期処理による改善
import asyncio
async def fetch_data(url, delay):
print(f"Fetching {url}...")
await asyncio.sleep(delay) # 非同期I/O待ち
print(f"Done {url}")
return f"data from {url}"
async def main():
results = await asyncio.gather(
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
)
return results
# 並行実行: 約3秒(最も遅いタスクの時間)
import time
start = time.time()
results = asyncio.run(main())
print(f"Total: {time.time() - start:.1f}s") # 約3秒
コルーチンとasync/await
コルーチンの定義
async def で定義された関数はコルーチン関数であり、呼び出すとコルーチンオブジェクトを返します。
async def greet(name):
return f"Hello, {name}"
# コルーチンオブジェクトが返る(実行はされない)
coro = greet("Alice")
# 実行するにはawaitまたはasyncio.run()が必要
result = asyncio.run(greet("Alice"))
print(result) # "Hello, Alice"
awaitの役割
await は非同期処理の完了を待つ式です。await 中にイベントループは他のタスクに制御を移すことができます。
async def process():
data = await fetch_data("api/users", 1) # 待機中に他のタスクが実行可能
return data
タスクの並行実行
asyncio.create_task()
コルーチンをタスクとしてスケジュールし、バックグラウンドで実行を開始します。
async def main():
task1 = asyncio.create_task(fetch_data("api/users", 2))
task2 = asyncio.create_task(fetch_data("api/posts", 3))
# 両タスクはここで並行実行中
result1 = await task1
result2 = await task2
return result1, result2
asyncio.gather()
複数のコルーチンを同時に実行し、すべての結果をまとめて返します。
async def main():
results = await asyncio.gather(
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
)
# resultsは入力順にリストで返される
return results
asyncio.as_completed()
完了順にイテレートしたい場合に使います。
async def main():
tasks = [
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Completed: {result}")
# 出力順: comments → users → posts(完了が早い順)
実践例:非同期HTTPリクエスト
aiohttp を使った並行HTTPリクエストの例です。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""単一URLの非同期フェッチ"""
async with session.get(url) as response:
data = await response.text()
return {"url": url, "status": response.status, "length": len(data)}
async def fetch_all(urls):
"""複数URLの並行フェッチ"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
# --- 実行 ---
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start
for r in results:
print(f"{r['url']}: status={r['status']}, length={r['length']}")
print(f"Total: {elapsed:.1f}s") # 逐次なら7秒、並行なら約2秒
非同期パターン
Semaphoreによるレート制限
同時接続数を制限して、サーバーに負荷をかけすぎないようにします。
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # 同時実行数を制限
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5) # 最大5並行
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
Queueによるプロデューサー・コンシューマー
async def producer(queue):
for i in range(10):
await queue.put(f"item-{i}")
await asyncio.sleep(0.1)
await queue.put(None) # 終了シグナル
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 他のコンシューマーにも伝搬
break
print(f"{name} processed {item}")
await asyncio.sleep(0.2)
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue),
consumer(queue, "worker-1"),
consumer(queue, "worker-2"),
)
エラーハンドリング
gatherでのエラー
return_exceptions=True で例外を結果として返すことができます。
async def risky_task(n):
if n == 2:
raise ValueError("Error in task 2")
return f"result-{n}"
async def main():
results = await asyncio.gather(
risky_task(1),
risky_task(2),
risky_task(3),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
よくある落とし穴
| 落とし穴 | 対策 |
|---|---|
| イベントループのブロック | CPU負荷の高い処理は run_in_executor で別スレッドに逃がす |
await の付け忘れ | コルーチンが実行されず RuntimeWarning が出る |
| 同期コードからの呼び出し | asyncio.run() で実行、ネストは nest_asyncio |
| タスクの参照消失 | create_task の結果を変数に保持する |
関連記事
- pythonでprintの上書きをする方法 - Pythonの実践的なTipsを紹介しています。
- PythonのMatplotlibで3Dアニメーション(GIF)を作成する方法 - Pythonの応用的な使い方を紹介しています。
- PythonでSlackに実験結果を通知する方法 - 外部APIを叩く実例を紹介しています。非同期処理と組み合わせることで効率的な通知が可能です。