Data Engineering

11 min read

May 6, 2026

Multi-Source Market Data: Normalization, Gap Filling, and Integrity Checks

How a trading platform ingests market data from four sources with different schemas, handles source failures mid-project without touching the rest of the pipeline, detects and fills gaps continuously, and validates data quality before it reaches model training.

PythonPostgreSQLPolygonBinanceData Engineering

The multi-source problem

The platform pulls market data from four sources, split between equities and crypto.

For equities: Polygon is the primary source. A WebSocket connection streams live 1-minute bars during market hours. REST endpoints fill historical data and backfill gaps. Timestamps are UTC, the schema is clean JSON, and volume coverage is near-complete.

For crypto: two sources handle live data. CoinGecko REST provides 5-minute OHLCV bars on a cron schedule running every five minutes. GeckoTerminal provides 1-minute OHLCV live and is the primary historical backfill source. Both are free tier with no paid API contracts. Binance serves as the third crypto source: REST endpoints for gap-filling recent windows, and the execution API for submitting crypto orders.

Each source has a distinct format challenge. Polygon and CoinGecko use clean JSON with UTC timestamps. Binance returns kline data as arrays rather than objects, so field access is positional by index, and timestamps are millisecond epoch integers that need conversion to UTC before storage. GeckoTerminal has more variable response latency and a schema that changed at least once mid-project.

The normalization challenge was not just format conversion. It was designing a pipeline where each source's data arrived at PostgreSQL in a canonical form that the feature pipeline could query without knowing which source produced a given row.

The adapter pattern

Each source has a dedicated collector class that inherits from a shared abstract base. The base defines the interface: fetch a symbol-timeframe pair for a date range, normalize the raw response into the canonical schema, and store the result. The collector handles the raw fetch. The normalizer handles schema conversion. Nothing downstream knows or cares which collector produced a given row.

collectors/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import pandas as pd

@dataclass
class OHLCVBar:
    symbol:    str
    timeframe: str
    source:    str
    timestamp: datetime  # always UTC
    open:      float
    high:      float
    low:       float
    close:     float
    volume:    float
    vwap:      float | None = None

class BaseCollector(ABC):
    source: str

    @abstractmethod
    def fetch(
        self,
        symbol: str,
        timeframe: str,
        start: datetime,
        end: datetime,
    ) -> list[OHLCVBar]: ...

    def normalize(self, raw: dict) -> OHLCVBar: ...

    def store(self, bars: list[OHLCVBar], session) -> int:
        # Upsert by (symbol, timeframe, source, timestamp)
        # Returns count of new rows inserted
        ...

The source field on every row matters when the same symbol is covered by multiple collectors. It lets quality checks compare coverage across sources and detect when one source's data diverges from another's.

Handling source failures

Midway through the project, the Binance WebSocket volume stream started returning inconsistent readings on some symbols, with reported volume 10x the expected values. The cause was not clear from the API documentation and debugging the WebSocket integration would have taken time with an uncertain outcome.

The decision was to replace it. The REST historical endpoint was reliable. WebSocket was only adding live updates. Switching to polling the REST endpoint on a short interval sacrificed a small amount of latency for reliability. The change touched one collector class. The PostgreSQL schema was unchanged. The feature pipeline was unchanged. Models already trained on Binance data continued to work.

This is the practical value of the adapter boundary in a project like this. Data sources are not permanent. APIs change, rate limits change, reliability changes. Each source being an interchangeable unit means you can respond to those changes without rewriting the pipeline around them. When a source proves unreliable, you replace the collector and move on.

A separate discovery came after the WebSocket fix was in place. Binance.US USD pairs had structural low volume coverage: around 7.6% of expected trading volume, because most crypto activity flows through USDT pairs rather than USD. This was not a data quality error. It was a characteristic of the market. Volume-based features calculated from Binance.US USD data were unreliable regardless of how clean the ingestion was. The response was to exclude volume features from the crypto feature set. Crypto models train on 12 features. Equity models train on 14. The same FeaturePipeline handles both, configured differently by experiment.

Gap detection and continuous filling

Collectors do not run as single historical dumps. They run continuously and include a gap filler that detects missing windows and schedules backfills.

A gap is any expected bar that is absent from the database. For a symbol with 1-hour bars, the gap filler queries the expected timestamps for a trailing window and compares them against what is in PostgreSQL. Missing timestamps are grouped into contiguous ranges and fetched as batch requests from the source.

collectors/gap_filler.py
from datetime import datetime, timezone, timedelta
import pandas as pd

def find_gaps(
    symbol: str,
    timeframe: str,
    source: str,
    lookback_days: int,
    session,
) -> list[tuple[datetime, datetime]]:
    now   = datetime.now(timezone.utc)
    start = now - timedelta(days=lookback_days)

    freq_map = {"1h": "1h", "4h": "4h", "1d": "1D"}
    expected = pd.date_range(start=start, end=now, freq=freq_map[timeframe], tz="UTC")

    existing = {
        row.timestamp
        for row in session.query(OHLCVRow.timestamp)
        .filter_by(symbol=symbol, timeframe=timeframe, source=source)
        .filter(OHLCVRow.timestamp >= start)
        .all()
    }

    missing = [ts for ts in expected if ts not in existing]
    if not missing:
        return []

    # Group into contiguous ranges to minimise API calls
    gaps, range_start, prev = [], missing[0], missing[0]
    for ts in missing[1:]:
        if ts - prev > pd.Timedelta(freq_map[timeframe]):
            gaps.append((range_start, prev))
            range_start = ts
        prev = ts
    gaps.append((range_start, prev))
    return gaps

Gap filler schedule is per-source. Polygon fills on a short schedule during market hours. CoinGecko runs on the 5-minute cron cadence but is rate-limited on the free tier, so backfilling large windows requires spreading requests. GeckoTerminal is the primary route for deeper crypto historical backfills.

Quality checks before training

Gap filling is necessary but not sufficient. Data can be present and still wrong. Prices with zero volume across a window. Timestamps out of order. Suspiciously flat close prices that suggest a stale feed. A zero-volume bar is more likely a feed error than a genuinely tradeless period.

The training runner runs quality checks against raw OHLCV data before any feature calculation. These checks are a hard gate: training does not proceed on data that fails.

pipeline/quality.py
import pandas as pd

def check_data_quality(
    df: pd.DataFrame,
    min_bars: int = 1000,
    max_gap_pct: float = 0.02,
    min_volume_coverage: float = 0.95,
) -> list[str]:
    failures = []

    if len(df) < min_bars:
        failures.append(f"insufficient data: {len(df)} bars, need {min_bars}")

    if not df["timestamp"].is_monotonic_increasing:
        failures.append("timestamps not monotonically increasing")

    expected_count = (
        (df["timestamp"].max() - df["timestamp"].min())
        / pd.Timedelta("1h") + 1
    )
    gap_pct = 1 - len(df) / expected_count
    if gap_pct > max_gap_pct:
        failures.append(
            f"gap rate {gap_pct:.1%} exceeds threshold {max_gap_pct:.1%}"
        )

    volume_coverage = (df["volume"] > 0).mean()
    if volume_coverage < min_volume_coverage:
        failures.append(
            f"volume coverage {volume_coverage:.1%} below threshold {min_volume_coverage:.1%}"
        )

    return failures

A training run that fails a quality check is logged with the failure reason and skipped. The model is not trained. This is preferable to training on bad data and discovering the problem after the model passes the gate.