To set up asyncio for bulk geocoding: initialize an asyncio.Semaphore to cap concurrent outbound requests, route all HTTP traffic through a single aiohttp.ClientSession for connection reuse, and dispatch your address batch with asyncio.gather() wrapped in chunk loops. This is covered in full under Building Async Geocoding Requests in Python — the guide below gives you the precise configuration, component-by-component rationale, and production edge-case handling.
Why Asyncio Changes the Economics of Bulk Geocoding
Bulk geocoding is I/O-bound, not CPU-bound. Each address resolution spends most of its wall-clock time waiting: DNS resolution, TLS negotiation, provider-side lookup, and network transit. A synchronous requests loop leaves the CPU idle during every one of those waits. With 50,000 addresses at an average provider latency of 120 ms, a serial pipeline takes roughly 100 minutes. The same workload through a properly tuned async pipeline — 20 concurrent requests — completes in about 5 minutes.
The diagram below shows how the event loop multiplexes network waits across coroutines without spawning threads:
The Four Configuration Pillars
Every production asyncio geocoding setup requires exactly four tightly coupled components. Missing any one causes either provider bans, memory exhaustion, or silent data loss.
1. asyncio.Semaphore — Concurrency Gate
asyncio.Semaphore(n) allows at most n coroutines to hold the semaphore simultaneously. Any coroutine that calls async with semaphore when n slots are occupied suspends and waits in the event loop’s queue. This is the primary mechanism for matching your throughput to provider-specific QPS limits.
A semaphore is not a rate limiter. It limits concurrent requests in flight, not requests per second. If your provider allows 50 req/sec and each request completes in 80 ms, a semaphore of 4 is enough to saturate that quota (50 req/s × 0.08 s = 4 in flight). If latency is variable (40–200 ms), start at 15–20 and watch for 429 responses.
2. aiohttp.ClientSession — Connection Pool
A single ClientSession maintains a pool of persistent TCP connections. Creating a new session per coroutine forces a fresh TLS handshake for every address — adding 50–150 ms of overhead per request and exhausting ephemeral port allocations on high-core servers.
Tune TCPConnector for your use case:
import aiohttp
connector = aiohttp.TCPConnector(
limit=100, # max total open sockets
limit_per_host=30, # max sockets to any single provider host
ttl_dns_cache=300, # cache DNS results for 5 minutes
keepalive_timeout=30 # keep idle connections alive 30 s
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=10.0)
)
Set limit_per_host to match your provider’s documented connection limit. Exceeding it triggers connection resets rather than 429s, making the root cause harder to diagnose.
3. Exponential Backoff — Retry Logic
HTTP 429 and 5xx responses are expected in high-throughput runs. The correct response is to pause that coroutine and retry, not to fail the entire batch. Cap the sleep duration to avoid individual coroutines blocking the event loop for unreasonable durations:
import asyncio
async def backoff_sleep(attempt: int, base: float = 1.0, cap: float = 30.0) -> None:
delay = min(base * (2 ** attempt), cap)
await asyncio.sleep(delay)
Using asyncio.sleep (not time.sleep) is critical — time.sleep blocks the entire event loop thread, freezing all other concurrent coroutines.
4. Chunked asyncio.gather — Memory Control
asyncio.gather(*tasks) schedules all tasks concurrently and returns results in input order. For a list of 200,000 addresses, creating 200,000 coroutine objects at once consumes several gigabytes of memory before a single request completes. Chunking bounds memory to a predictable footprint:
async def gather_chunks(tasks, chunk_size: int = 2000):
results = []
for i in range(0, len(tasks), chunk_size):
chunk = tasks[i : i + chunk_size]
chunk_results = await asyncio.gather(*chunk, return_exceptions=True)
results.extend(chunk_results)
return results
Always pass return_exceptions=True. Without it, a single provider timeout propagates as an unhandled exception that cancels the entire gather, dropping all results from that chunk.
Complete Production Implementation
The following class wires all four components into a usable, copy-paste-ready geocoder. It returns a list of GeocodeResult dataclasses preserving input order, and logs provider metadata for auditing.
import asyncio
import aiohttp
import logging
from dataclasses import dataclass, field
from typing import List, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class GeocodeResult:
address: str
lat: Optional[float] = None
lon: Optional[float] = None
provider: Optional[str] = None
success: bool = False
attempts: int = 0
class AsyncGeocoder:
"""
Async geocoder with semaphore-gated concurrency, connection pooling,
exponential backoff retries, and a silent secondary-provider fallback.
"""
PRIMARY_URL = "https://api.primary-geocoder.example/v1/geocode"
FALLBACK_URL = "https://api.secondary-geocoder.example/lookup"
def __init__(
self,
max_concurrency: int = 15,
timeout: float = 10.0,
max_retries: int = 3,
chunk_size: int = 2000,
) -> None:
self.semaphore = asyncio.Semaphore(max_concurrency)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.chunk_size = chunk_size
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> "AsyncGeocoder":
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
keepalive_timeout=30,
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self.session:
await self.session.close()
async def _try_primary(self, address: str) -> Optional[GeocodeResult]:
params = {"q": address, "format": "json"}
async with self.session.get(self.PRIMARY_URL, params=params) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get("results") or []
if results:
return GeocodeResult(
address=address,
lat=float(results[0]["lat"]),
lon=float(results[0]["lon"]),
provider="primary",
success=True,
)
elif resp.status == 429:
raise aiohttp.ClientResponseError(
resp.request_info, resp.history, status=429
)
return None # 200 but empty results
async def _try_fallback(self, address: str) -> Optional[GeocodeResult]:
params = {"address": address}
async with self.session.get(self.FALLBACK_URL, params=params) as resp:
if resp.status == 200:
data = await resp.json()
loc = data.get("location")
if loc:
return GeocodeResult(
address=address,
lat=float(loc["lat"]),
lon=float(loc["lon"]),
provider="fallback",
success=True,
)
return None
async def _geocode_single(self, address: str) -> GeocodeResult:
last_exc: Optional[Exception] = None
for attempt in range(self.max_retries):
try:
async with self.semaphore:
result = await self._try_primary(address)
if result:
result.attempts = attempt + 1
return result
# Primary returned no results — try fallback
result = await self._try_fallback(address)
if result:
result.attempts = attempt + 1
return result
except aiohttp.ClientResponseError as exc:
if exc.status == 429:
delay = min(1.0 * (2 ** attempt), 30.0)
logger.warning(
"Rate limited (attempt %d/%d). Sleeping %.1fs.",
attempt + 1, self.max_retries, delay,
)
await asyncio.sleep(delay)
last_exc = exc
continue
last_exc = exc
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
delay = min(1.0 * (2 ** attempt), 30.0)
logger.debug(
"Network error for '%s' (attempt %d): %s. Retrying in %.1fs.",
address, attempt + 1, exc, delay,
)
await asyncio.sleep(delay)
last_exc = exc
logger.warning("Failed to geocode '%s' after %d attempts: %s",
address, self.max_retries, last_exc)
return GeocodeResult(address=address, success=False,
attempts=self.max_retries)
async def geocode_batch(self, addresses: List[str]) -> List[GeocodeResult]:
"""Geocode a list of addresses, chunked to bound memory usage."""
all_results: List[GeocodeResult] = []
total = len(addresses)
for i in range(0, total, self.chunk_size):
chunk = addresses[i : i + self.chunk_size]
tasks = [self._geocode_single(addr) for addr in chunk]
chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
for addr, res in zip(chunk, chunk_results):
if isinstance(res, Exception):
logger.error("Unexpected error for '%s': %s", addr, res)
all_results.append(GeocodeResult(address=addr, success=False))
else:
all_results.append(res)
logger.info(
"Chunk %d–%d complete (%d/%d addresses)",
i + 1, min(i + self.chunk_size, total), min(i + self.chunk_size, total), total,
)
return all_results
# --- Vectorized pandas variant ---
# For DataFrame workflows, apply geocoding as a batch on the address column:
#
# import asyncio
# import pandas as pd
#
# async def geocode_dataframe(df: pd.DataFrame, address_col: str) -> pd.DataFrame:
# async with AsyncGeocoder(max_concurrency=15) as geocoder:
# results = await geocoder.geocode_batch(df[address_col].tolist())
# result_df = pd.DataFrame([vars(r) for r in results])
# return df.join(result_df[["lat", "lon", "provider", "success"]])
Entrypoint:
async def main() -> None:
addresses = [
"1600 Amphitheatre Pkwy, Mountain View, CA 94043",
"350 5th Ave, New York, NY 10118",
"221B Baker St, London, NW1 6XE",
]
async with AsyncGeocoder(max_concurrency=10, chunk_size=1000) as geocoder:
results = await geocoder.geocode_batch(addresses)
for r in results:
status = f"{r.lat:.5f}, {r.lon:.5f} [{r.provider}]" if r.success else "FAILED"
print(f"{r.address} → {status}")
if __name__ == "__main__":
asyncio.run(main())
Configuration Reference
| Parameter | Recommended range | Effect when too low | Effect when too high |
|---|---|---|---|
max_concurrency (Semaphore) |
10–50 | Underutilises provider quota; slow throughput | Triggers 429s; exhausts file descriptors |
TCPConnector limit |
100–300 | Connection queuing; increased latency | OS socket limit hit; OSError: [Errno 24] |
limit_per_host |
20–50 | Under-uses keep-alive pool | Provider resets connections |
ClientTimeout(total) |
8–15 s | Aborts valid slow responses | Coroutines block the loop on hung connections |
chunk_size |
1,000–5,000 | Excessive overhead; slow progress logging | Memory pressure; no intermediate checkpointing |
max_retries |
3–5 | Drops addresses on transient failures | Stalls pipeline on unreachable providers |
Edge Cases and Failure Modes
Empty-Result 200 Responses
Geocoding providers frequently return HTTP 200 with an empty results array for ambiguous or unstructured input. These are not network errors — standard exception handling will not catch them. The _try_primary method above explicitly checks results before returning a success, then falls through to the fallback provider. Treat empty-result 200s as soft failures and log the raw address for a separate normalization pass before resubmission.
Semaphore Starvation Under High Latency
If provider latency spikes to 5–8 seconds (common during peak hours), all max_concurrency semaphore slots fill quickly. New coroutines queue behind them, and the effective throughput collapses. Monitor p95 latency in your logs; if it exceeds 3× the normal value, temporarily reduce max_concurrency by half to drain the backlog before ramping up again.
File Descriptor Exhaustion
On Linux, the default ulimit -n open-file limit is 1024. Each open socket consumes one file descriptor. With TCPConnector(limit=300) and the OS limit at 1024, other process I/O (log files, database connections) can push the process past the limit, causing OSError: [Errno 24] Too many open files. Raise the limit in your deployment environment (ulimit -n 65536 or equivalent systemd LimitNOFILE) before scaling beyond 200 concurrent connections.
Integration Note
This asyncio setup is the lowest layer in a broader multi-API routing and fallback chain architecture. The AsyncGeocoder above handles transport-level concerns — concurrency, retries, and connection reuse. The decision of which provider to call first, and how to budget quota across providers, belongs one level up: see API Quota Tracking and Cost Management for Redis-backed counter patterns that feed routing decisions. For addresses that exhaust all retry attempts, structured fallback chains for failed lookups define the deterministic escalation path to secondary and tertiary providers.
Input addresses should be normalised before submission — stripping extraneous whitespace, expanding common abbreviations, and validating postal code format. Passing raw, denormalised strings to the geocoder increases the empty-result 200 failure rate. The core address parsing and standardization pipeline covers the pre-processing steps that feed clean input to the async layer.
Related
- Building Async Geocoding Requests in Python — Covers request construction, Pydantic response validation, tenacity retry decorators, and load-testing methodology.
- Implementing Fallback Chains for Failed Lookups — Deterministic provider escalation patterns for addresses that fail after all retries.
- Rate Limiting Strategies for Batch Processing — Token-bucket and sliding-window algorithms for staying within provider QPS caps without semaphore-only approaches.
- API Quota Tracking and Cost Management — Redis-backed quota counters and cost-per-address telemetry that inform dynamic routing decisions.