Python asyncioによる非同期処理入門

Pythonのasyncioモジュールを使った非同期処理の基礎を、async/await構文、タスクの並行実行、非同期HTTPリクエストの実例とともに解説します。

はじめに

Pythonの asyncio は、非同期I/O処理を実現する標準ライブラリです。ネットワーク通信やファイルI/Oなど、待ち時間(I/Oバウンド)の多い処理を効率的に並行実行できます。

本記事では、async/await 構文の基本からタスクの並行実行、実践的な非同期HTTPリクエストまでを解説します。

同期 vs 非同期

同期処理の問題

import time

def fetch_data(url, delay):
    print(f"Fetching {url}...")
    time.sleep(delay)  # I/O待ちをシミュレート
    print(f"Done {url}")
    return f"data from {url}"

# 逐次実行: 合計6秒
start = time.time()
fetch_data("api/users", 2)
fetch_data("api/posts", 3)
fetch_data("api/comments", 1)
print(f"Total: {time.time() - start:.1f}s")  # 約6秒

非同期処理による改善

import asyncio

async def fetch_data(url, delay):
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # 非同期I/O待ち
    print(f"Done {url}")
    return f"data from {url}"

async def main():
    results = await asyncio.gather(
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    )
    return results

# 並行実行: 約3秒(最も遅いタスクの時間)
import time
start = time.time()
results = asyncio.run(main())
print(f"Total: {time.time() - start:.1f}s")  # 約3秒

コルーチンとasync/await

コルーチンの定義

async def で定義された関数はコルーチン関数であり、呼び出すとコルーチンオブジェクトを返します。

async def greet(name):
    return f"Hello, {name}"

# コルーチンオブジェクトが返る(実行はされない)
coro = greet("Alice")

# 実行するにはawaitまたはasyncio.run()が必要
result = asyncio.run(greet("Alice"))
print(result)  # "Hello, Alice"

awaitの役割

await は非同期処理の完了を待つ式です。await 中にイベントループは他のタスクに制御を移すことができます。

async def process():
    data = await fetch_data("api/users", 1)  # 待機中に他のタスクが実行可能
    return data

タスクの並行実行

asyncio.create_task()

コルーチンをタスクとしてスケジュールし、バックグラウンドで実行を開始します。

async def main():
    task1 = asyncio.create_task(fetch_data("api/users", 2))
    task2 = asyncio.create_task(fetch_data("api/posts", 3))

    # 両タスクはここで並行実行中
    result1 = await task1
    result2 = await task2
    return result1, result2

asyncio.gather()

複数のコルーチンを同時に実行し、すべての結果をまとめて返します。

async def main():
    results = await asyncio.gather(
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    )
    # resultsは入力順にリストで返される
    return results

asyncio.as_completed()

完了順にイテレートしたい場合に使います。

async def main():
    tasks = [
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    ]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Completed: {result}")
    # 出力順: comments → users → posts(完了が早い順)

実践例:非同期HTTPリクエスト

aiohttp を使った並行HTTPリクエストの例です。

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """単一URLの非同期フェッチ"""
    async with session.get(url) as response:
        data = await response.text()
        return {"url": url, "status": response.status, "length": len(data)}

async def fetch_all(urls):
    """複数URLの並行フェッチ"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# --- 実行 ---
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
]

start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start

for r in results:
    print(f"{r['url']}: status={r['status']}, length={r['length']}")
print(f"Total: {elapsed:.1f}s")  # 逐次なら7秒、並行なら約2秒

非同期パターン

Semaphoreによるレート制限

同時接続数を制限して、サーバーに負荷をかけすぎないようにします。

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:  # 同時実行数を制限
        async with session.get(url) as response:
            return await response.text()

async def main():
    semaphore = asyncio.Semaphore(5)  # 最大5並行
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        return await asyncio.gather(*tasks)

Queueによるプロデューサー・コンシューマー

async def producer(queue):
    for i in range(10):
        await queue.put(f"item-{i}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # 終了シグナル

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 他のコンシューマーにも伝搬
            break
        print(f"{name} processed {item}")
        await asyncio.sleep(0.2)

async def main():
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue),
        consumer(queue, "worker-1"),
        consumer(queue, "worker-2"),
    )

エラーハンドリング

gatherでのエラー

return_exceptions=True で例外を結果として返すことができます。

async def risky_task(n):
    if n == 2:
        raise ValueError("Error in task 2")
    return f"result-{n}"

async def main():
    results = await asyncio.gather(
        risky_task(1),
        risky_task(2),
        risky_task(3),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"OK: {r}")

よくある落とし穴

落とし穴対策
イベントループのブロックCPU負荷の高い処理は run_in_executor で別スレッドに逃がす
await の付け忘れコルーチンが実行されず RuntimeWarning が出る
同期コードからの呼び出しasyncio.run() で実行、ネストは nest_asyncio
タスクの参照消失create_task の結果を変数に保持する

関連記事

参考文献