Introduction
Python’s asyncio is a standard library for asynchronous I/O processing. It efficiently runs I/O-bound operations like network communication and file I/O concurrently.
This article covers the basics of async/await syntax, concurrent task execution, and practical async HTTP requests.
Synchronous vs Asynchronous
The Synchronous Problem
import time
def fetch_data(url, delay):
print(f"Fetching {url}...")
time.sleep(delay) # Simulate I/O wait
print(f"Done {url}")
return f"data from {url}"
# Sequential: total 6 seconds
start = time.time()
fetch_data("api/users", 2)
fetch_data("api/posts", 3)
fetch_data("api/comments", 1)
print(f"Total: {time.time() - start:.1f}s") # ~6 seconds
Async Improvement
import asyncio
async def fetch_data(url, delay):
print(f"Fetching {url}...")
await asyncio.sleep(delay) # Async I/O wait
print(f"Done {url}")
return f"data from {url}"
async def main():
results = await asyncio.gather(
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
)
return results
# Concurrent: ~3 seconds (time of slowest task)
import time
start = time.time()
results = asyncio.run(main())
print(f"Total: {time.time() - start:.1f}s") # ~3 seconds
Coroutines and async/await
Defining Coroutines
Functions defined with async def are coroutine functions that return coroutine objects when called.
async def greet(name):
return f"Hello, {name}"
# Returns a coroutine object (not executed)
coro = greet("Alice")
# Requires await or asyncio.run() to execute
result = asyncio.run(greet("Alice"))
print(result) # "Hello, Alice"
The Role of await
await waits for async operation completion. While awaiting, the event loop can transfer control to other tasks.
async def process():
data = await fetch_data("api/users", 1) # Other tasks can run during wait
return data
Concurrent Task Execution
asyncio.create_task()
Schedules a coroutine as a task, starting background execution.
async def main():
task1 = asyncio.create_task(fetch_data("api/users", 2))
task2 = asyncio.create_task(fetch_data("api/posts", 3))
# Both tasks running concurrently
result1 = await task1
result2 = await task2
return result1, result2
asyncio.gather()
Runs multiple coroutines concurrently and returns all results together.
async def main():
results = await asyncio.gather(
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
)
# results returned as a list in input order
return results
asyncio.as_completed()
Iterate in completion order:
async def main():
tasks = [
fetch_data("api/users", 2),
fetch_data("api/posts", 3),
fetch_data("api/comments", 1),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Completed: {result}")
# Output order: comments → users → posts (fastest first)
Practical Example: Async HTTP Requests
Concurrent HTTP requests using aiohttp:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Async fetch of a single URL"""
async with session.get(url) as response:
data = await response.text()
return {"url": url, "status": response.status, "length": len(data)}
async def fetch_all(urls):
"""Concurrent fetch of multiple URLs"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
# --- Run ---
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
start = time.time()
results = asyncio.run(fetch_all(urls))
elapsed = time.time() - start
for r in results:
print(f"{r['url']}: status={r['status']}, length={r['length']}")
print(f"Total: {elapsed:.1f}s") # Sequential: 7s, Concurrent: ~2s
Async Patterns
Rate Limiting with Semaphore
Limit concurrent connections to avoid overloading servers:
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # Limit concurrency
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
Producer-Consumer with Queue
async def producer(queue):
for i in range(10):
await queue.put(f"item-{i}")
await asyncio.sleep(0.1)
await queue.put(None) # Sentinel
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Propagate to other consumers
break
print(f"{name} processed {item}")
await asyncio.sleep(0.2)
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue),
consumer(queue, "worker-1"),
consumer(queue, "worker-2"),
)
Error Handling
Errors in gather
Use return_exceptions=True to return exceptions as results:
async def risky_task(n):
if n == 2:
raise ValueError("Error in task 2")
return f"result-{n}"
async def main():
results = await asyncio.gather(
risky_task(1),
risky_task(2),
risky_task(3),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
Common Pitfalls
| Pitfall | Solution |
|---|---|
| Blocking the event loop | Offload CPU-heavy work with run_in_executor |
Forgetting await | Coroutine won’t execute, RuntimeWarning raised |
| Calling from sync code | Use asyncio.run(), or nest_asyncio for nesting |
| Task reference lost | Store create_task result in a variable |
Related Articles
- How to Overwrite Print Output in Python - Practical Python tips.
- Creating 3D Animations (GIF) with Python Matplotlib - Advanced Python usage.
- Sending Experiment Results to Slack with Python - Calling external APIs from Python, which can be made more efficient with async.