Use Python and Redis to track geocoding API spend in real time: increment atomic float counters on every call, compare them to per-provider daily budgets, and automatically route traffic away from any provider that exceeds its limit — all as part of API Quota Tracking & Cost Management.
Redis Key Schema for Geocoding Cost Tracking
The entire tracking system rests on three predictable key patterns. Get these right and every other operation — increment, threshold check, fallback routing — becomes a single-line Redis command.
| Key pattern | Type | Purpose |
|---|---|---|
spend:{provider}:{YYYY-MM-DD} |
String (float) | Cumulative daily cost in dollars |
quota:{provider}:{YYYY-MM-DD} |
String (int) | Cumulative daily request count |
config:{provider} |
Hash | rate_per_call, daily_budget, max_requests |
Daily keys expire automatically at 86 400 seconds (one UTC day), eliminating manual reset jobs. Provider configs are permanent hashes updated whenever pricing changes.
Why this schema and not a sorted set or stream? For geocoding cost control, you need two operations: atomic increment and threshold read. Both are O(1) on a plain string key. Streams and sorted sets add overhead without benefit for this use-case.
Component Breakdown
| Component | Redis command | Why it is necessary |
|---|---|---|
| Float spend accumulator | INCRBYFLOAT spend:{p}:{d} 0.005 |
Geocoding costs are fractional; integer counters lose precision at scale |
| Integer quota counter | INCR quota:{p}:{d} |
Integer is sufficient and cheaper than float for request counts |
| TTL on daily keys | EXPIRE key 86400 |
Auto-resets at day boundary; no cron job required |
| Config hash | HGETALL config:{p} |
Single round-trip to fetch all provider limits |
| Pipeline batch | pipe.execute() |
Sends increment + two EXPIRE calls in one TCP round-trip |
Minimal Runnable Implementation
Install the official client: pip install redis.
import datetime
import redis
from typing import Dict, List, Optional, Tuple
# Module-level connection pool — one pool shared across all threads.
_POOL: Optional[redis.ConnectionPool] = None
def get_pool(redis_url: str = "redis://localhost:6379/0") -> redis.ConnectionPool:
global _POOL
if _POOL is None:
_POOL = redis.ConnectionPool.from_url(
redis_url,
max_connections=50,
decode_responses=True,
socket_connect_timeout=2,
)
return _POOL
class GeocodingSpendTracker:
"""Atomic, real-time spend and quota tracker backed by Redis.
All writes use pipelining (one TCP round-trip per call). Reads
use a second pipeline so threshold evaluation is also one round-trip.
"""
def __init__(self, redis_url: str = "redis://localhost:6379/0") -> None:
self.r = redis.Redis(connection_pool=get_pool(redis_url))
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
@staticmethod
def _today() -> str:
return datetime.date.today().isoformat()
def _keys(self, provider: str) -> Tuple[str, str, str]:
today = GeocodingSpendTracker._today()
return (
f"spend:{provider}:{today}",
f"quota:{provider}:{today}",
f"config:{provider}",
)
# ------------------------------------------------------------------
# Write path
# ------------------------------------------------------------------
def record_request(
self, provider: str, cost_per_call: float
) -> Dict[str, float]:
"""Atomically increment spend and quota; return updated state.
The pipeline is NOT a MULTI/EXEC transaction — individual commands
are atomic but intermediate state is visible to other clients.
For strict billing isolation use a Lua script (see Edge Cases).
"""
spend_key, quota_key, _ = self._keys(provider)
pipe = self.r.pipeline(transaction=False)
pipe.incr(quota_key)
pipe.incrbyfloat(spend_key, cost_per_call)
# Set TTL only when key is first created; resetting on every call
# can push expiry past the billing midnight boundary. Use EXPIREAT
# with the next-UTC-midnight epoch if strict alignment matters.
pipe.expire(spend_key, 86_400)
pipe.expire(quota_key, 86_400)
results = pipe.execute()
return {
"quota": int(results[0]),
"spend": round(float(results[1]), 6),
}
# ------------------------------------------------------------------
# Read path
# ------------------------------------------------------------------
def check_threshold(self, provider: str) -> Dict[str, object]:
"""Return budget/quota status for provider without modifying state."""
spend_key, quota_key, config_key = self._keys(provider)
pipe = self.r.pipeline(transaction=False)
pipe.hgetall(config_key)
pipe.get(spend_key)
pipe.get(quota_key)
config, raw_spend, raw_quota = pipe.execute()
if not config:
# Provider has no config — treat as unconstrained.
return {"over_budget": False, "over_quota": False}
daily_limit: float = float(config.get("daily_budget", "inf"))
max_requests: int = int(config.get("max_requests", 0))
current_spend: float = float(raw_spend or 0)
current_quota: int = int(raw_quota or 0)
return {
"over_budget": current_spend >= daily_limit,
"over_quota": bool(max_requests and current_quota >= max_requests),
"current_spend": current_spend,
"current_quota": current_quota,
}
# ------------------------------------------------------------------
# Routing helper
# ------------------------------------------------------------------
def get_active_provider(
self, provider_list: List[str]
) -> Optional[str]:
"""Return the first provider within both budget and quota limits.
Returns None when all providers are exhausted — the caller should
queue the address for retry rather than dropping it silently.
"""
for provider in provider_list:
status = self.check_threshold(provider)
if not status["over_budget"] and not status["over_quota"]:
return provider
return None
# ------------------------------------------------------------------
# Vectorized usage (pandas)
# ------------------------------------------------------------------
def apply_provider_selection(
df: "pd.DataFrame",
tracker: GeocodingSpendTracker,
provider_list: List[str],
) -> "pd.DataFrame":
"""Add an 'assigned_provider' column to a DataFrame of addresses.
Calls get_active_provider once per row. For large batches (>10 k rows)
pre-fetch all threshold states outside the loop and pass as a dict.
"""
import pandas as pd # local import keeps the module importable without pandas
df = df.copy()
df["assigned_provider"] = df.index.map(
lambda _: tracker.get_active_provider(provider_list)
)
return df
Flow Diagram
Edge Cases and Failure Modes
1. TTL pushing expiry past the billing midnight
EXPIRE spend:{p}:{date} 86400 resets the clock on every call. If calls continue near midnight the key survives past the billing boundary, double-counting into the next day’s budget. Fix: set the TTL only at key creation using SET … EX or a Lua check, or replace EXPIRE with EXPIREAT pointing to the next UTC midnight:
import calendar, datetime
def _seconds_until_midnight_utc() -> int:
now = datetime.datetime.utcnow()
midnight = (now + datetime.timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
return int((midnight - now).total_seconds())
# In record_request, after the pipeline:
ttl = _seconds_until_midnight_utc()
pipe.expire(spend_key, ttl)
pipe.expire(quota_key, ttl)
2. Non-atomic pipeline leaving partial state
A plain pipeline sends commands in one round-trip but does not use MULTI/EXEC, so another client can read an intermediate state (quota incremented, spend not yet). For financial auditing where exact atomicity matters, wrap the increment in a Lua script:
LUA_RECORD = """
local quota = redis.call('INCR', KEYS[1])
local spend = redis.call('INCRBYFLOAT', KEYS[2], ARGV[1])
return {quota, spend}
"""
_record_script = None # module-level, registered once
def record_atomic(r: redis.Redis, provider: str, cost: float) -> Dict[str, float]:
global _record_script
if _record_script is None:
_record_script = r.register_script(LUA_RECORD)
today = datetime.date.today().isoformat()
quota_key = f"quota:{provider}:{today}"
spend_key = f"spend:{provider}:{today}"
results = _record_script(keys=[quota_key, spend_key], args=[cost])
return {"quota": int(results[0]), "spend": round(float(results[1]), 6)}
3. Redis unreachable — fail open, not stalled
Geocoding throughput must not block on a non-critical counter update. If Redis is down, log the missed increment and continue:
def safe_record(
tracker: GeocodingSpendTracker,
provider: str,
cost: float,
) -> Optional[Dict[str, float]]:
try:
return tracker.record_request(provider, cost)
except redis.RedisError:
# Log for later reconciliation; do not halt the pipeline.
import logging
logging.warning("Redis unavailable — spend counter skipped for %s", provider)
return None
On recovery, reconcile skipped increments from your application logs or from the geocoding provider’s own usage API.
Integration Note
This tracker is the enforcement layer inside the API Quota Tracking & Cost Management workflow. Call get_active_provider() before every geocoding dispatch; call record_request() immediately after a successful response. The provider_list you pass should follow the same priority order you configure for implementing fallback chains for failed lookups — so budget exhaustion and HTTP errors both collapse to the same graceful degradation path. For high-throughput pipelines that send thousands of addresses per minute, combine this tracker with rate-limiting strategies for batch processing to pace outbound request volume independently of spend limits.
Related
- API Quota Tracking & Cost Management — the parent workflow covering provider selection policies, budget tiers, and alerting strategies.
- Implementing Fallback Chains for Failed Lookups — how to chain providers so HTTP errors and budget exhaustion both route to the same fallback sequence.
- Rate-Limiting Strategies for Batch Processing — pacing outbound geocoding volume to stay within per-second request limits across parallel workers.
- Building Async Geocoding Requests in Python — how to integrate this synchronous spend tracker into an
asyncio-based geocoding pipeline without blocking the event loop.