Building Async Geocoding Requests in Python

As part of the Multi-API Routing & Fallback Chains architecture, this page covers one specific problem: replacing blocking, one-at-a-time geocoding calls with a non-blocking concurrent dispatcher that processes thousands of addresses in parallel without violating provider rate limits.

Modern address enrichment pipelines routinely resolve tens of thousands of locations per hour. Synchronous HTTP calls create an immediate bottleneck — each request blocks the execution thread until the provider responds, wasting CPU cycles and inflating wall-clock time. An async dispatcher eliminates this constraint by combining non-blocking I/O, connection pooling, and structured concurrency through Python’s asyncio primitives.


Async geocoding pipeline architecture Addresses enter an input queue, fan out through a semaphore-gated concurrent dispatcher, each slot calls a provider API, responses pass through Pydantic validation, and valid results are collected in an output list. Input Address Queue asyncio Semaphore (concurrency cap) + aiolimiter Provider A aiohttp GET Provider B aiohttp GET Provider C aiohttp GET Pydantic Validation schema check + DLQ on fail Results List 1. enqueue 2. gate 3. fetch 4. validate 5. collect

Prerequisites


Production-Ready Workflow

Step 1 — Normalize input addresses

Before dispatching any HTTP request, parse and standardize each address to reduce provider-side ambiguity. Strip leading/trailing whitespace, collapse internal whitespace runs, normalize Unicode to NFC, and confirm that postal-code and city fields are present. Malformed inputs that fail this step should be routed to a separate error queue rather than sent to the provider.

import re
import unicodedata
from typing import Optional

_MULTI_SPACE = re.compile(r"\s{2,}")

def normalize_address(raw: str) -> Optional[str]:
    """Return a cleaned address string, or None if structurally unusable."""
    text = unicodedata.normalize("NFC", raw.strip())
    text = _MULTI_SPACE.sub(" ", text)
    return text if len(text) >= 10 else None  # discard suspiciously short inputs

Step 2 — Initialize a shared connection pool

Create one aiohttp.ClientSession per pipeline run. Reusing the session across all coroutines eliminates redundant TLS handshakes and DNS lookups. Use TCPConnector to cap simultaneous sockets and cache DNS resolutions.

from aiohttp import ClientSession, TCPConnector, ClientTimeout

connector = TCPConnector(
    limit=100,          # max open sockets — set >= Semaphore ceiling
    ttl_dns_cache=300,  # seconds to cache resolved hostnames
    keepalive_timeout=30,
)
session = ClientSession(
    connector=connector,
    timeout=ClientTimeout(total=10),  # per-request wall-clock timeout
)

Always close the session when the pipeline finishes. The async context-manager pattern in the implementation section below handles this automatically.

Step 3 — Apply concurrency and rate controls

Two complementary primitives control throughput:

  • asyncio.Semaphore(n) — caps simultaneous in-flight coroutines (connection-level control).
  • AsyncLimiter(max_rate, time_period) from aiolimiter — enforces a sustained token-bucket rate (time-level control).
import asyncio
from aiolimiter import AsyncLimiter

PROVIDER_QPS = 20          # provider's documented requests-per-second limit
MAX_CONCURRENCY = PROVIDER_QPS - 2   # leave headroom for retries

semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
rate_limiter = AsyncLimiter(max_rate=PROVIDER_QPS, time_period=1.0)

Using both together means a coroutine must acquire a semaphore slot and a rate-limiter token before sending — preventing both socket exhaustion and QPS overruns simultaneously. For multi-provider pipelines, see rate-limiting strategies for batch processing for token-bucket algorithm selection across heterogeneous provider limits.

Step 4 — Dispatch concurrent tasks

asyncio.gather() launches all coroutines concurrently and returns results in the same order as the inputs. Pass return_exceptions=True so a single failure does not abort the entire batch.

tasks = [fetch_geocode(session, addr) for addr in addresses]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)

For early-termination scenarios (e.g., stop as soon as the first N results arrive), use asyncio.as_completed() instead — it yields each finished coroutine in completion order.

Step 5 — Validate responses with Pydantic

Never trust raw provider payloads. Wrap each response in a strict Pydantic model so missing fields raise a ValidationError rather than silently propagating None values into your coordinate store.

Step 6 — Collect, log, and route failures

Separate valid results from exceptions. Log each failure with the originating address and exception type. Route unrecoverable failures to a dead-letter queue (DLQ) for manual inspection or fallback to a secondary provider.


Primary Code Implementation

The class below is a self-contained, production-grade async geocoder. It implements all six steps above as an async context manager with chunked batch dispatch to control memory pressure.

"""
async_geocoder.py — Production async geocoding dispatcher.

Dependencies: aiohttp, pydantic>=2, tenacity, aiolimiter
Python: 3.10+
"""

import asyncio
import logging
from typing import List, Optional, Tuple

from aiohttp import ClientSession, TCPConnector, ClientTimeout, ClientResponseError
from aiolimiter import AsyncLimiter
from pydantic import BaseModel, ValidationError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)

logger = logging.getLogger(__name__)


class GeocodeResult(BaseModel):
    """Validated geocoding result from a single provider response."""

    address: str
    lat: float
    lon: float
    confidence: Optional[float] = None
    provider: str


class AsyncGeocoder:
    """
    Non-blocking geocoder with semaphore-gated concurrency, token-bucket
    rate limiting, exponential-backoff retries, and Pydantic validation.

    Usage::

        async with AsyncGeocoder(api_key="...", base_url="https://...") as gc:
            results = await gc.process_batch(addresses)
    """

    def __init__(
        self,
        api_key: str,
        base_url: str,
        provider_name: str = "provider",
        max_concurrency: int = 18,
        qps: float = 20.0,
        request_timeout: float = 10.0,
        chunk_size: int = 5_000,
    ) -> None:
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.provider_name = provider_name
        self.chunk_size = chunk_size
        self._semaphore = asyncio.Semaphore(max_concurrency)
        self._limiter = AsyncLimiter(max_rate=qps, time_period=1.0)
        self._timeout = ClientTimeout(total=request_timeout)
        self._session: Optional[ClientSession] = None

    async def __aenter__(self) -> "AsyncGeocoder":
        connector = TCPConnector(
            limit=self._semaphore._value + 10,  # sockets ≥ concurrency ceiling
            ttl_dns_cache=300,
            keepalive_timeout=30,
        )
        self._session = ClientSession(connector=connector, timeout=self._timeout)
        return self

    async def __aexit__(self, *_: object) -> None:
        if self._session:
            await self._session.close()

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type(
            (asyncio.TimeoutError, ConnectionError, ClientResponseError)
        ),
        before_sleep=before_sleep_log(logger, logging.WARNING),
    )
    async def _fetch_one(self, address: str) -> GeocodeResult:
        """Fetch and validate a single geocoding response, with retries."""
        assert self._session is not None, "Use AsyncGeocoder as a context manager."

        async with self._limiter:
            async with self._semaphore:
                async with self._session.get(
                    f"{self.base_url}/geocode",
                    params={"q": address, "key": self.api_key, "format": "json"},
                ) as resp:
                    resp.raise_for_status()
                    payload = await resp.json()

        # Pydantic raises ValidationError on schema mismatch — let it propagate
        return GeocodeResult(
            address=address,
            lat=payload["lat"],
            lon=payload["lon"],
            confidence=payload.get("confidence"),
            provider=self.provider_name,
        )

    async def _process_chunk(
        self, chunk: List[str]
    ) -> Tuple[List[GeocodeResult], List[Tuple[str, Exception]]]:
        """Process one chunk; return (valid_results, failures)."""
        tasks = [self._fetch_one(addr) for addr in chunk]
        raw = await asyncio.gather(*tasks, return_exceptions=True)

        valid: List[GeocodeResult] = []
        failed: List[Tuple[str, Exception]] = []
        for addr, outcome in zip(chunk, raw):
            if isinstance(outcome, (Exception, ValidationError)):
                logger.warning("geocode failed | address=%r error=%s", addr, outcome)
                failed.append((addr, outcome))
            else:
                valid.append(outcome)
        return valid, failed

    async def process_batch(
        self, addresses: List[str]
    ) -> Tuple[List[GeocodeResult], List[Tuple[str, Exception]]]:
        """
        Geocode *addresses* in memory-safe chunks.

        Returns (all_valid, all_failed) tuples so callers can route failures
        to a dead-letter queue or secondary provider without a separate scan.
        """
        all_valid: List[GeocodeResult] = []
        all_failed: List[Tuple[str, Exception]] = []
        total_chunks = (len(addresses) + self.chunk_size - 1) // self.chunk_size

        for i in range(0, len(addresses), self.chunk_size):
            chunk = addresses[i : i + self.chunk_size]
            chunk_num = i // self.chunk_size + 1
            valid, failed = await self._process_chunk(chunk)
            all_valid.extend(valid)
            all_failed.extend(failed)
            logger.info(
                "chunk %d/%d complete | resolved=%d failed=%d",
                chunk_num,
                total_chunks,
                len(valid),
                len(failed),
            )

        return all_valid, all_failed

Entry point:

async def run_pipeline() -> None:
    addresses = [
        "1600 Amphitheatre Pkwy, Mountain View, CA 94043",
        "350 5th Ave, New York, NY 10118",
    ]
    async with AsyncGeocoder(
        api_key="your_key",
        base_url="https://api.example.com",
        provider_name="example",
        max_concurrency=18,
        qps=20.0,
    ) as geocoder:
        valid, failed = await geocoder.process_batch(addresses)
        print(f"Resolved: {len(valid)}  Failed: {len(failed)}")

asyncio.run(run_pipeline())

Vectorized pandas variant

When the address list arrives as a pandas.DataFrame, avoid row-by-row Python loops:

import asyncio
import pandas as pd
from typing import List


async def geocode_dataframe(
    df: pd.DataFrame,
    address_col: str = "address",
    **geocoder_kwargs: object,
) -> pd.DataFrame:
    """
    Geocode a DataFrame column in place.  Returns the original frame with
    lat/lon/confidence/provider columns appended.
    """
    addresses: List[str] = df[address_col].fillna("").tolist()

    async with AsyncGeocoder(**geocoder_kwargs) as geocoder:  # type: ignore[arg-type]
        valid, _ = await geocoder.process_batch(addresses)

    result_map = {r.address: r for r in valid}
    df["lat"] = df[address_col].map(lambda a: result_map[a].lat if a in result_map else None)
    df["lon"] = df[address_col].map(lambda a: result_map[a].lon if a in result_map else None)
    df["geocode_confidence"] = df[address_col].map(
        lambda a: result_map[a].confidence if a in result_map else None
    )
    df["geocode_provider"] = df[address_col].map(
        lambda a: result_map[a].provider if a in result_map else None
    )
    return df


# Usage:
# df = asyncio.run(geocode_dataframe(df, address_col="full_address",
#     api_key="...", base_url="https://...", provider_name="example"))

The fillna("") guard prevents NaN values from reaching the geocoder. Addresses that fail validation appear as None in the output columns, which downstream dropna() or fillna() steps can handle uniformly.


Provider Parameter Reference

Parameter Recommended value Rationale
TCPConnector(limit=...) max_concurrency + 10 Prevents aiohttp’s internal queue from masking true concurrency
TCPConnector(ttl_dns_cache=...) 300 seconds Caches DNS results across the batch; reduces latency per request
TCPConnector(keepalive_timeout=...) 30 seconds Sustains persistent connections through inter-chunk pauses
ClientTimeout(total=...) 10 seconds Hard deadline per request; prevents zombie tasks under provider instability
asyncio.Semaphore(n) floor(QPS) - 2 Leaves slots for retries; prevents QPS ceiling breaches
AsyncLimiter(max_rate=...) Provider QPS × 0.9 10 % safety margin absorbs burst from retry storms
stop_after_attempt(...) 3 Three total attempts per address; avoids infinite retry loops
wait_exponential(min=2, max=10) 2 s → 4 s → 8 s backoff cap; aligns with common Retry-After headers
chunk_size 5_000 Caps simultaneous coroutine objects to control resident memory

Edge Cases

1. Provider returns 200 OK with an empty result set

Some providers (notably Nominatim) return HTTP 200 with an empty JSON array [] when no match is found. raise_for_status() does not catch this — add an explicit emptiness check before constructing GeocodeResult:

if not payload or (isinstance(payload, list) and len(payload) == 0):
    raise ValueError(f"No geocoding results for address: {address!r}")
result_data = payload[0] if isinstance(payload, list) else payload

2. Provider enforces per-IP burst limits independent of QPS

Some APIs allow 20 QPS sustained but reject any burst window that exceeds 5 requests in 200 ms. Wrap the AsyncLimiter with a secondary short-window limiter:

burst_limiter = AsyncLimiter(max_rate=5, time_period=0.2)

async with burst_limiter:
    async with self._limiter:
        async with self._semaphore:
            # ... fetch

3. Unicode addresses trigger provider 400 errors

Providers that predate full Unicode support may reject addresses containing non-ASCII characters unless percent-encoded. aiohttp handles URL encoding for params= automatically, but ensure your input normalization step applies NFKC normalization for non-Latin address data before dispatch rather than relying on the provider to normalize it server-side.

4. Event loop already running (Jupyter, FastAPI)

asyncio.run() raises RuntimeError: This event loop is already running inside Jupyter notebooks and FastAPI route handlers. In notebooks, await directly. In FastAPI:

from fastapi import BackgroundTasks

@app.post("/geocode")
async def geocode_endpoint(payload: BatchRequest) -> dict:
    valid, failed = await geocoder.process_batch(payload.addresses)
    return {"resolved": len(valid), "failed": len(failed)}

5. Chunked dispatch causes order mismatches in the DataFrame

process_batch returns results only for addresses that succeeded. If callers need to re-join results to the original index, key on GeocodeResult.address rather than positional index — the vectorized variant above demonstrates this pattern with result_map.


Performance and Vectorization

Throughput scaling: At 20 QPS with a 250 ms average provider latency, a semaphore ceiling of 18 yields approximately 72 resolved addresses per second (18 in-flight × 4 completions/second). Raising max_concurrency beyond floor(QPS / avg_latency_s) provides no throughput gain and increases memory consumption.

Memory pressure: Each coroutine object in Python 3.10 occupies roughly 1 KB of heap memory. A chunk_size of 5,000 allocates ~5 MB of coroutine overhead per chunk, which is negligible on standard data-engineering hardware. Raising chunk_size reduces inter-chunk logging overhead at the cost of later failure visibility.

DNS resolution: ttl_dns_cache=300 prevents repeated DNS lookups across a batch run. For providers with load-balanced endpoints, periodic re-resolution is acceptable — lower ttl_dns_cache to 60 if the provider documentation recommends it.

Connection reuse vs. latency: keepalive_timeout=30 sustains HTTP/1.1 keep-alive connections across inter-chunk pauses. If your chunk processing time regularly exceeds 30 seconds (e.g., because of CPU-intensive post-processing), raise keepalive_timeout to 60 or persist the session across multiple batch calls.

Baseline benchmarks (indicative, 10 000 addresses, single provider):

Strategy Wall-clock time Notes
Synchronous requests ~500 s 50 ms average RTT × 10 000
Async, concurrency=1 ~500 s Async overhead, no parallelism gain
Async, concurrency=18, QPS=20 ~140 s Rate-limit-bound at 20 QPS
Async, concurrency=50, QPS=50 ~50 s Higher-tier provider plan required

Troubleshooting

aiohttp.ServerDisconnectedError mid-batch

Root cause: The provider closed the keep-alive connection before your timeout expired — common after ~100 requests on some APIs. Fix: Catch ServerDisconnectedError in the retry_if_exception_type list:

from aiohttp import ServerDisconnectedError
retry=retry_if_exception_type(
    (asyncio.TimeoutError, ConnectionError, ClientResponseError, ServerDisconnectedError)
)

HTTP 429 responses not triggering retries

Root cause: ClientResponseError is only raised when resp.raise_for_status() is called. If you return early before that line on status 200, 429 slips through as a successful response. Fix: Always call resp.raise_for_status() unconditionally before parsing the body, and ensure ClientResponseError is in your retry_if_exception_type list.

asyncio.TimeoutError floods the log under load

Root cause: Provider P99 latency spikes above ClientTimeout(total=10) during peak periods. Fix: Raise total to 20 seconds and add a before_sleep log hook (already included in the implementation) so each retry attempt is visible without flooding at ERROR level.

Pydantic ValidationError on field lat/lon

Root cause: Provider returns coordinates as strings ("lat": "37.4") rather than floats. Fix: Add a field validator:

from pydantic import field_validator

class GeocodeResult(BaseModel):
    lat: float
    lon: float

    @field_validator("lat", "lon", mode="before")
    @classmethod
    def coerce_float(cls, v: object) -> float:
        return float(v)

RuntimeError: Session is closed when reusing the geocoder

Root cause: The ClientSession was closed by __aexit__ and the same AsyncGeocoder instance was used for a second batch. Fix: Use a fresh async with AsyncGeocoder(...) context for each pipeline invocation, or restructure the caller to pass the entire address list in one process_batch call.


FAQ

What concurrency level should I set for the asyncio.Semaphore?

Start at floor(QPS) - 2 where QPS is your provider’s documented requests-per-second limit. Leave two slots for retries and health checks. Increase only after confirming via response headers that the provider has available headroom.

Why do I get event-loop errors when running asyncio inside a Jupyter notebook?

Jupyter already runs an event loop. Use await geocoder.process_batch(addresses) directly inside a notebook cell, or install nest_asyncio and call nest_asyncio.apply() at the top of the notebook.

How do I avoid exhausting the aiohttp connector’s connection pool?

Set TCPConnector(limit=...) to a value at or above your Semaphore concurrency ceiling. Mismatches cause tasks to queue inside aiohttp itself, masking the true concurrency level and making latency measurement unreliable.

Can I mix sync and async geocoding calls in the same pipeline?

Yes, but isolate them. Wrap any synchronous provider SDK in asyncio.to_thread() so it runs in a thread pool without blocking the event loop. Never call blocking I/O directly inside a coroutine.

How do I preserve the original row order when using asyncio.gather()?

asyncio.gather() preserves the order of its input awaitables in the returned list, so zipping the results list against the original addresses list is safe and correct — as the _process_chunk implementation demonstrates.