Introduction to Asynchronous Programming with Python asyncio

Fundamentals of Python asyncio with async/await syntax, concurrent task execution, and practical async HTTP request examples.

Introduction

Python’s asyncio is a standard library for asynchronous I/O processing. It efficiently runs I/O-bound operations like network communication and file I/O concurrently.

This article covers the basics of async/await syntax, concurrent task execution, and practical async HTTP requests.

Synchronous vs Asynchronous

The Synchronous Problem

import time

def fetch_data(url, delay):
    print(f"Fetching {url}...")
    time.sleep(delay)  # Simulate I/O wait
    print(f"Done {url}")
    return f"data from {url}"

# Sequential: total 6 seconds
start = time.time()
fetch_data("api/users", 2)
fetch_data("api/posts", 3)
fetch_data("api/comments", 1)
print(f"Total: {time.time() - start:.1f}s")  # ~6 seconds

Async Improvement

import asyncio

async def fetch_data(url, delay):
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # Async I/O wait
    print(f"Done {url}")
    return f"data from {url}"

async def main():
    results = await asyncio.gather(
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    )
    return results

# Concurrent: ~3 seconds (time of slowest task)
import time
start = time.time()
results = asyncio.run(main())
print(f"Total: {time.time() - start:.1f}s")  # ~3 seconds

Coroutines and async/await

Defining Coroutines

Functions defined with async def are coroutine functions that return coroutine objects when called.

async def greet(name):
    return f"Hello, {name}"

# Returns a coroutine object (not executed)
coro = greet("Alice")

# Requires await or asyncio.run() to execute
result = asyncio.run(greet("Alice"))
print(result)  # "Hello, Alice"

The Role of await

await waits for async operation completion. While awaiting, the event loop can transfer control to other tasks.

async def process():
    data = await fetch_data("api/users", 1)  # Other tasks can run during wait
    return data

Concurrent Task Execution

asyncio.create_task()

Schedules a coroutine as a task, starting background execution.

async def main():
    task1 = asyncio.create_task(fetch_data("api/users", 2))
    task2 = asyncio.create_task(fetch_data("api/posts", 3))

    # Both tasks running concurrently
    result1 = await task1
    result2 = await task2
    return result1, result2

asyncio.gather()

Runs multiple coroutines concurrently and returns all results together.

async def main():
    results = await asyncio.gather(
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    )
    # results returned as a list in input order
    return results

asyncio.as_completed()

Iterate in completion order:

async def main():
    tasks = [
        fetch_data("api/users", 2),
        fetch_data("api/posts", 3),
        fetch_data("api/comments", 1),
    ]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Completed: {result}")
    # Output order: comments → users → posts (fastest first)

Practical Example: Async HTTP Requests

Concurrent HTTP requests using aiohttp:

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """Async fetch of a single URL"""
    async with session.get(url) as response:
        data = await response.text()
        return {"url": url, "status": response.status, "length": len(data)}

async def fetch_all(urls):
    """Concurrent fetch of multiple URLs"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# --- Run ---
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
]

start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start

for r in results:
    print(f"{r['url']}: status={r['status']}, length={r['length']}")
print(f"Total: {elapsed:.1f}s")  # Sequential: 7s, Concurrent: ~2s

Async Patterns

Rate Limiting with Semaphore

Limit concurrent connections to avoid overloading servers:

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:  # Limit concurrency
        async with session.get(url) as response:
            return await response.text()

async def main():
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        return await asyncio.gather(*tasks)

Producer-Consumer with Queue

async def producer(queue):
    for i in range(10):
        await queue.put(f"item-{i}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Sentinel

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Propagate to other consumers
            break
        print(f"{name} processed {item}")
        await asyncio.sleep(0.2)

async def main():
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue),
        consumer(queue, "worker-1"),
        consumer(queue, "worker-2"),
    )

Error Handling

Errors in gather

Use return_exceptions=True to return exceptions as results:

async def risky_task(n):
    if n == 2:
        raise ValueError("Error in task 2")
    return f"result-{n}"

async def main():
    results = await asyncio.gather(
        risky_task(1),
        risky_task(2),
        risky_task(3),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"OK: {r}")

Common Pitfalls

PitfallSolution
Blocking the event loopOffload CPU-heavy work with run_in_executor
Forgetting awaitCoroutine won’t execute, RuntimeWarning raised
Calling from sync codeUse asyncio.run(), or nest_asyncio for nesting
Task reference lostStore create_task result in a variable

References