How to Set Up Asyncio for Bulk Geocoding

To set up asyncio for bulk geocoding: initialize an asyncio.Semaphore to cap concurrent outbound requests, route all HTTP traffic through a single aiohttp.ClientSession for connection reuse, and dispatch your address batch with asyncio.gather() wrapped in chunk loops. This is covered in full under Building Async Geocoding Requests in Python — the guide below gives you the precise configuration, component-by-component rationale, and production edge-case handling.

Why Asyncio Changes the Economics of Bulk Geocoding

Bulk geocoding is I/O-bound, not CPU-bound. Each address resolution spends most of its wall-clock time waiting: DNS resolution, TLS negotiation, provider-side lookup, and network transit. A synchronous requests loop leaves the CPU idle during every one of those waits. With 50,000 addresses at an average provider latency of 120 ms, a serial pipeline takes roughly 100 minutes. The same workload through a properly tuned async pipeline — 20 concurrent requests — completes in about 5 minutes.

The diagram below shows how the event loop multiplexes network waits across coroutines without spawning threads:

Asyncio Event Loop: Coroutine Scheduling for Bulk Geocoding Timeline showing four geocoding coroutines sharing a single thread. While each coroutine awaits a network response, the event loop runs the next coroutine, achieving parallelism without threads. Single thread / event loop coro-1 await network coro-2 await network coro-3 await network coro-4 await network CPU work Waiting (yielded to loop) t=0 t=end

The Four Configuration Pillars

Every production asyncio geocoding setup requires exactly four tightly coupled components. Missing any one causes either provider bans, memory exhaustion, or silent data loss.

1. asyncio.Semaphore — Concurrency Gate

asyncio.Semaphore(n) allows at most n coroutines to hold the semaphore simultaneously. Any coroutine that calls async with semaphore when n slots are occupied suspends and waits in the event loop’s queue. This is the primary mechanism for matching your throughput to provider-specific QPS limits.

A semaphore is not a rate limiter. It limits concurrent requests in flight, not requests per second. If your provider allows 50 req/sec and each request completes in 80 ms, a semaphore of 4 is enough to saturate that quota (50 req/s × 0.08 s = 4 in flight). If latency is variable (40–200 ms), start at 15–20 and watch for 429 responses.

2. aiohttp.ClientSession — Connection Pool

A single ClientSession maintains a pool of persistent TCP connections. Creating a new session per coroutine forces a fresh TLS handshake for every address — adding 50–150 ms of overhead per request and exhausting ephemeral port allocations on high-core servers.

Tune TCPConnector for your use case:

import aiohttp

connector = aiohttp.TCPConnector(
    limit=100,           # max total open sockets
    limit_per_host=30,   # max sockets to any single provider host
    ttl_dns_cache=300,   # cache DNS results for 5 minutes
    keepalive_timeout=30 # keep idle connections alive 30 s
)
session = aiohttp.ClientSession(
    connector=connector,
    timeout=aiohttp.ClientTimeout(total=10.0)
)

Set limit_per_host to match your provider’s documented connection limit. Exceeding it triggers connection resets rather than 429s, making the root cause harder to diagnose.

3. Exponential Backoff — Retry Logic

HTTP 429 and 5xx responses are expected in high-throughput runs. The correct response is to pause that coroutine and retry, not to fail the entire batch. Cap the sleep duration to avoid individual coroutines blocking the event loop for unreasonable durations:

import asyncio

async def backoff_sleep(attempt: int, base: float = 1.0, cap: float = 30.0) -> None:
    delay = min(base * (2 ** attempt), cap)
    await asyncio.sleep(delay)

Using asyncio.sleep (not time.sleep) is critical — time.sleep blocks the entire event loop thread, freezing all other concurrent coroutines.

4. Chunked asyncio.gather — Memory Control

asyncio.gather(*tasks) schedules all tasks concurrently and returns results in input order. For a list of 200,000 addresses, creating 200,000 coroutine objects at once consumes several gigabytes of memory before a single request completes. Chunking bounds memory to a predictable footprint:

async def gather_chunks(tasks, chunk_size: int = 2000):
    results = []
    for i in range(0, len(tasks), chunk_size):
        chunk = tasks[i : i + chunk_size]
        chunk_results = await asyncio.gather(*chunk, return_exceptions=True)
        results.extend(chunk_results)
    return results

Always pass return_exceptions=True. Without it, a single provider timeout propagates as an unhandled exception that cancels the entire gather, dropping all results from that chunk.

Complete Production Implementation

The following class wires all four components into a usable, copy-paste-ready geocoder. It returns a list of GeocodeResult dataclasses preserving input order, and logs provider metadata for auditing.

import asyncio
import aiohttp
import logging
from dataclasses import dataclass, field
from typing import List, Optional

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)


@dataclass
class GeocodeResult:
    address: str
    lat: Optional[float] = None
    lon: Optional[float] = None
    provider: Optional[str] = None
    success: bool = False
    attempts: int = 0


class AsyncGeocoder:
    """
    Async geocoder with semaphore-gated concurrency, connection pooling,
    exponential backoff retries, and a silent secondary-provider fallback.
    """

    PRIMARY_URL = "https://api.primary-geocoder.example/v1/geocode"
    FALLBACK_URL = "https://api.secondary-geocoder.example/lookup"

    def __init__(
        self,
        max_concurrency: int = 15,
        timeout: float = 10.0,
        max_retries: int = 3,
        chunk_size: int = 2000,
    ) -> None:
        self.semaphore = asyncio.Semaphore(max_concurrency)
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.chunk_size = chunk_size
        self.session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self) -> "AsyncGeocoder":
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=30,
            ttl_dns_cache=300,
            keepalive_timeout=30,
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if self.session:
            await self.session.close()

    async def _try_primary(self, address: str) -> Optional[GeocodeResult]:
        params = {"q": address, "format": "json"}
        async with self.session.get(self.PRIMARY_URL, params=params) as resp:
            if resp.status == 200:
                data = await resp.json()
                results = data.get("results") or []
                if results:
                    return GeocodeResult(
                        address=address,
                        lat=float(results[0]["lat"]),
                        lon=float(results[0]["lon"]),
                        provider="primary",
                        success=True,
                    )
            elif resp.status == 429:
                raise aiohttp.ClientResponseError(
                    resp.request_info, resp.history, status=429
                )
        return None  # 200 but empty results

    async def _try_fallback(self, address: str) -> Optional[GeocodeResult]:
        params = {"address": address}
        async with self.session.get(self.FALLBACK_URL, params=params) as resp:
            if resp.status == 200:
                data = await resp.json()
                loc = data.get("location")
                if loc:
                    return GeocodeResult(
                        address=address,
                        lat=float(loc["lat"]),
                        lon=float(loc["lon"]),
                        provider="fallback",
                        success=True,
                    )
        return None

    async def _geocode_single(self, address: str) -> GeocodeResult:
        last_exc: Optional[Exception] = None
        for attempt in range(self.max_retries):
            try:
                async with self.semaphore:
                    result = await self._try_primary(address)
                    if result:
                        result.attempts = attempt + 1
                        return result
                    # Primary returned no results — try fallback
                    result = await self._try_fallback(address)
                    if result:
                        result.attempts = attempt + 1
                        return result
            except aiohttp.ClientResponseError as exc:
                if exc.status == 429:
                    delay = min(1.0 * (2 ** attempt), 30.0)
                    logger.warning(
                        "Rate limited (attempt %d/%d). Sleeping %.1fs.",
                        attempt + 1, self.max_retries, delay,
                    )
                    await asyncio.sleep(delay)
                    last_exc = exc
                    continue
                last_exc = exc
            except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
                delay = min(1.0 * (2 ** attempt), 30.0)
                logger.debug(
                    "Network error for '%s' (attempt %d): %s. Retrying in %.1fs.",
                    address, attempt + 1, exc, delay,
                )
                await asyncio.sleep(delay)
                last_exc = exc

        logger.warning("Failed to geocode '%s' after %d attempts: %s",
                       address, self.max_retries, last_exc)
        return GeocodeResult(address=address, success=False,
                             attempts=self.max_retries)

    async def geocode_batch(self, addresses: List[str]) -> List[GeocodeResult]:
        """Geocode a list of addresses, chunked to bound memory usage."""
        all_results: List[GeocodeResult] = []
        total = len(addresses)
        for i in range(0, total, self.chunk_size):
            chunk = addresses[i : i + self.chunk_size]
            tasks = [self._geocode_single(addr) for addr in chunk]
            chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
            for addr, res in zip(chunk, chunk_results):
                if isinstance(res, Exception):
                    logger.error("Unexpected error for '%s': %s", addr, res)
                    all_results.append(GeocodeResult(address=addr, success=False))
                else:
                    all_results.append(res)
            logger.info(
                "Chunk %d–%d complete (%d/%d addresses)",
                i + 1, min(i + self.chunk_size, total), min(i + self.chunk_size, total), total,
            )
        return all_results


# --- Vectorized pandas variant ---
# For DataFrame workflows, apply geocoding as a batch on the address column:
#
# import asyncio
# import pandas as pd
#
# async def geocode_dataframe(df: pd.DataFrame, address_col: str) -> pd.DataFrame:
#     async with AsyncGeocoder(max_concurrency=15) as geocoder:
#         results = await geocoder.geocode_batch(df[address_col].tolist())
#     result_df = pd.DataFrame([vars(r) for r in results])
#     return df.join(result_df[["lat", "lon", "provider", "success"]])

Entrypoint:

async def main() -> None:
    addresses = [
        "1600 Amphitheatre Pkwy, Mountain View, CA 94043",
        "350 5th Ave, New York, NY 10118",
        "221B Baker St, London, NW1 6XE",
    ]
    async with AsyncGeocoder(max_concurrency=10, chunk_size=1000) as geocoder:
        results = await geocoder.geocode_batch(addresses)

    for r in results:
        status = f"{r.lat:.5f}, {r.lon:.5f} [{r.provider}]" if r.success else "FAILED"
        print(f"{r.address}{status}")


if __name__ == "__main__":
    asyncio.run(main())

Configuration Reference

Parameter Recommended range Effect when too low Effect when too high
max_concurrency (Semaphore) 10–50 Underutilises provider quota; slow throughput Triggers 429s; exhausts file descriptors
TCPConnector limit 100–300 Connection queuing; increased latency OS socket limit hit; OSError: [Errno 24]
limit_per_host 20–50 Under-uses keep-alive pool Provider resets connections
ClientTimeout(total) 8–15 s Aborts valid slow responses Coroutines block the loop on hung connections
chunk_size 1,000–5,000 Excessive overhead; slow progress logging Memory pressure; no intermediate checkpointing
max_retries 3–5 Drops addresses on transient failures Stalls pipeline on unreachable providers

Edge Cases and Failure Modes

Empty-Result 200 Responses

Geocoding providers frequently return HTTP 200 with an empty results array for ambiguous or unstructured input. These are not network errors — standard exception handling will not catch them. The _try_primary method above explicitly checks results before returning a success, then falls through to the fallback provider. Treat empty-result 200s as soft failures and log the raw address for a separate normalization pass before resubmission.

Semaphore Starvation Under High Latency

If provider latency spikes to 5–8 seconds (common during peak hours), all max_concurrency semaphore slots fill quickly. New coroutines queue behind them, and the effective throughput collapses. Monitor p95 latency in your logs; if it exceeds 3× the normal value, temporarily reduce max_concurrency by half to drain the backlog before ramping up again.

File Descriptor Exhaustion

On Linux, the default ulimit -n open-file limit is 1024. Each open socket consumes one file descriptor. With TCPConnector(limit=300) and the OS limit at 1024, other process I/O (log files, database connections) can push the process past the limit, causing OSError: [Errno 24] Too many open files. Raise the limit in your deployment environment (ulimit -n 65536 or equivalent systemd LimitNOFILE) before scaling beyond 200 concurrent connections.

Integration Note

This asyncio setup is the lowest layer in a broader multi-API routing and fallback chain architecture. The AsyncGeocoder above handles transport-level concerns — concurrency, retries, and connection reuse. The decision of which provider to call first, and how to budget quota across providers, belongs one level up: see API Quota Tracking and Cost Management for Redis-backed counter patterns that feed routing decisions. For addresses that exhaust all retry attempts, structured fallback chains for failed lookups define the deterministic escalation path to secondary and tertiary providers.

Input addresses should be normalised before submission — stripping extraneous whitespace, expanding common abbreviations, and validating postal code format. Passing raw, denormalised strings to the geocoder increases the empty-result 200 failure rate. The core address parsing and standardization pipeline covers the pre-processing steps that feed clean input to the async layer.