Tracking API Spend with Python and Redis

Use Python and Redis to track geocoding API spend in real time: increment atomic float counters on every call, compare them to per-provider daily budgets, and automatically route traffic away from any provider that exceeds its limit — all as part of API Quota Tracking & Cost Management.

Redis Key Schema for Geocoding Cost Tracking

The entire tracking system rests on three predictable key patterns. Get these right and every other operation — increment, threshold check, fallback routing — becomes a single-line Redis command.

Key pattern Type Purpose
spend:{provider}:{YYYY-MM-DD} String (float) Cumulative daily cost in dollars
quota:{provider}:{YYYY-MM-DD} String (int) Cumulative daily request count
config:{provider} Hash rate_per_call, daily_budget, max_requests

Daily keys expire automatically at 86 400 seconds (one UTC day), eliminating manual reset jobs. Provider configs are permanent hashes updated whenever pricing changes.

Why this schema and not a sorted set or stream? For geocoding cost control, you need two operations: atomic increment and threshold read. Both are O(1) on a plain string key. Streams and sorted sets add overhead without benefit for this use-case.

Component Breakdown

Component Redis command Why it is necessary
Float spend accumulator INCRBYFLOAT spend:{p}:{d} 0.005 Geocoding costs are fractional; integer counters lose precision at scale
Integer quota counter INCR quota:{p}:{d} Integer is sufficient and cheaper than float for request counts
TTL on daily keys EXPIRE key 86400 Auto-resets at day boundary; no cron job required
Config hash HGETALL config:{p} Single round-trip to fetch all provider limits
Pipeline batch pipe.execute() Sends increment + two EXPIRE calls in one TCP round-trip

Minimal Runnable Implementation

Install the official client: pip install redis.

import datetime
import redis

from typing import Dict, List, Optional, Tuple

# Module-level connection pool — one pool shared across all threads.
_POOL: Optional[redis.ConnectionPool] = None


def get_pool(redis_url: str = "redis://localhost:6379/0") -> redis.ConnectionPool:
    global _POOL
    if _POOL is None:
        _POOL = redis.ConnectionPool.from_url(
            redis_url,
            max_connections=50,
            decode_responses=True,
            socket_connect_timeout=2,
        )
    return _POOL


class GeocodingSpendTracker:
    """Atomic, real-time spend and quota tracker backed by Redis.

    All writes use pipelining (one TCP round-trip per call). Reads
    use a second pipeline so threshold evaluation is also one round-trip.
    """

    def __init__(self, redis_url: str = "redis://localhost:6379/0") -> None:
        self.r = redis.Redis(connection_pool=get_pool(redis_url))

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _today() -> str:
        return datetime.date.today().isoformat()

    def _keys(self, provider: str) -> Tuple[str, str, str]:
        today = GeocodingSpendTracker._today()
        return (
            f"spend:{provider}:{today}",
            f"quota:{provider}:{today}",
            f"config:{provider}",
        )

    # ------------------------------------------------------------------
    # Write path
    # ------------------------------------------------------------------

    def record_request(
        self, provider: str, cost_per_call: float
    ) -> Dict[str, float]:
        """Atomically increment spend and quota; return updated state.

        The pipeline is NOT a MULTI/EXEC transaction — individual commands
        are atomic but intermediate state is visible to other clients.
        For strict billing isolation use a Lua script (see Edge Cases).
        """
        spend_key, quota_key, _ = self._keys(provider)

        pipe = self.r.pipeline(transaction=False)
        pipe.incr(quota_key)
        pipe.incrbyfloat(spend_key, cost_per_call)
        # Set TTL only when key is first created; resetting on every call
        # can push expiry past the billing midnight boundary. Use EXPIREAT
        # with the next-UTC-midnight epoch if strict alignment matters.
        pipe.expire(spend_key, 86_400)
        pipe.expire(quota_key, 86_400)
        results = pipe.execute()

        return {
            "quota": int(results[0]),
            "spend": round(float(results[1]), 6),
        }

    # ------------------------------------------------------------------
    # Read path
    # ------------------------------------------------------------------

    def check_threshold(self, provider: str) -> Dict[str, object]:
        """Return budget/quota status for provider without modifying state."""
        spend_key, quota_key, config_key = self._keys(provider)

        pipe = self.r.pipeline(transaction=False)
        pipe.hgetall(config_key)
        pipe.get(spend_key)
        pipe.get(quota_key)
        config, raw_spend, raw_quota = pipe.execute()

        if not config:
            # Provider has no config — treat as unconstrained.
            return {"over_budget": False, "over_quota": False}

        daily_limit: float = float(config.get("daily_budget", "inf"))
        max_requests: int = int(config.get("max_requests", 0))
        current_spend: float = float(raw_spend or 0)
        current_quota: int = int(raw_quota or 0)

        return {
            "over_budget": current_spend >= daily_limit,
            "over_quota": bool(max_requests and current_quota >= max_requests),
            "current_spend": current_spend,
            "current_quota": current_quota,
        }

    # ------------------------------------------------------------------
    # Routing helper
    # ------------------------------------------------------------------

    def get_active_provider(
        self, provider_list: List[str]
    ) -> Optional[str]:
        """Return the first provider within both budget and quota limits.

        Returns None when all providers are exhausted — the caller should
        queue the address for retry rather than dropping it silently.
        """
        for provider in provider_list:
            status = self.check_threshold(provider)
            if not status["over_budget"] and not status["over_quota"]:
                return provider
        return None


# ------------------------------------------------------------------
# Vectorized usage (pandas)
# ------------------------------------------------------------------

def apply_provider_selection(
    df: "pd.DataFrame",
    tracker: GeocodingSpendTracker,
    provider_list: List[str],
) -> "pd.DataFrame":
    """Add an 'assigned_provider' column to a DataFrame of addresses.

    Calls get_active_provider once per row. For large batches (>10 k rows)
    pre-fetch all threshold states outside the loop and pass as a dict.
    """
    import pandas as pd  # local import keeps the module importable without pandas

    df = df.copy()
    df["assigned_provider"] = df.index.map(
        lambda _: tracker.get_active_provider(provider_list)
    )
    return df

Flow Diagram

Geocoding API Spend Tracking Flow A left-to-right flow showing an incoming address being checked against Redis budget counters, routed to an active geocoding provider, with a fallback path when a provider is over budget. Address batch in get_active _provider() check_threshold ×n Redis spend:p:date / quota:p:date Geocode call (active provider) record_request() INCR + INCRBYFLOAT Fallback / queue retry Normalised address out HGETALL / GET within budget all exhausted

Edge Cases and Failure Modes

1. TTL pushing expiry past the billing midnight

EXPIRE spend:{p}:{date} 86400 resets the clock on every call. If calls continue near midnight the key survives past the billing boundary, double-counting into the next day’s budget. Fix: set the TTL only at key creation using SET … EX or a Lua check, or replace EXPIRE with EXPIREAT pointing to the next UTC midnight:

import calendar, datetime

def _seconds_until_midnight_utc() -> int:
    now = datetime.datetime.utcnow()
    midnight = (now + datetime.timedelta(days=1)).replace(
        hour=0, minute=0, second=0, microsecond=0
    )
    return int((midnight - now).total_seconds())

# In record_request, after the pipeline:
ttl = _seconds_until_midnight_utc()
pipe.expire(spend_key, ttl)
pipe.expire(quota_key, ttl)

2. Non-atomic pipeline leaving partial state

A plain pipeline sends commands in one round-trip but does not use MULTI/EXEC, so another client can read an intermediate state (quota incremented, spend not yet). For financial auditing where exact atomicity matters, wrap the increment in a Lua script:

LUA_RECORD = """
local quota = redis.call('INCR', KEYS[1])
local spend = redis.call('INCRBYFLOAT', KEYS[2], ARGV[1])
return {quota, spend}
"""

_record_script = None  # module-level, registered once

def record_atomic(r: redis.Redis, provider: str, cost: float) -> Dict[str, float]:
    global _record_script
    if _record_script is None:
        _record_script = r.register_script(LUA_RECORD)
    today = datetime.date.today().isoformat()
    quota_key = f"quota:{provider}:{today}"
    spend_key = f"spend:{provider}:{today}"
    results = _record_script(keys=[quota_key, spend_key], args=[cost])
    return {"quota": int(results[0]), "spend": round(float(results[1]), 6)}

3. Redis unreachable — fail open, not stalled

Geocoding throughput must not block on a non-critical counter update. If Redis is down, log the missed increment and continue:

def safe_record(
    tracker: GeocodingSpendTracker,
    provider: str,
    cost: float,
) -> Optional[Dict[str, float]]:
    try:
        return tracker.record_request(provider, cost)
    except redis.RedisError:
        # Log for later reconciliation; do not halt the pipeline.
        import logging
        logging.warning("Redis unavailable — spend counter skipped for %s", provider)
        return None

On recovery, reconcile skipped increments from your application logs or from the geocoding provider’s own usage API.

Integration Note

This tracker is the enforcement layer inside the API Quota Tracking & Cost Management workflow. Call get_active_provider() before every geocoding dispatch; call record_request() immediately after a successful response. The provider_list you pass should follow the same priority order you configure for implementing fallback chains for failed lookups — so budget exhaustion and HTTP errors both collapse to the same graceful degradation path. For high-throughput pipelines that send thousands of addresses per minute, combine this tracker with rate-limiting strategies for batch processing to pace outbound request volume independently of spend limits.