Building Automated OSM to Graph ETL Pipelines

Routing solvers collapse when raw OpenStreetMap data enters production with unresolved topological fragmentation. Building Automated OSM to Graph ETL Pipelines demands deterministic coordinate snapping, strict schema enforcement, and idempotent batch loading. This guide isolates the edge deduplication and spatial normalization bottleneck, delivering a production-grade Python transformation layer that guarantees graph connectivity for downstream mobility and logistics engines.

Topological Diagnostics & Fragmentation Analysis

Raw PBF exports contain overlapping ways, floating nodes, and micro-duplications. These artifacts fracture the adjacency matrix, causing shortest-path algorithms to return null traversals or infinite loops. Baseline validation begins with degree distribution analysis. Isolated vertices (degree zero) signal failed snapping or coordinate drift outside the target bounding box. We compute hash collisions on unordered coordinate pairs to quantify fragmentation density before any graph serialization occurs.

Diagnostic queries should isolate:

  1. Orphaned Nodes: Coordinates with zero incident edges after initial parsing.
  2. Micro-Edges: Segments shorter than 0.5 meters that introduce numerical instability in routing weights.
  3. Duplicate Hashes: Identical edge signatures derived from unordered (source, target) tuples.

Establishing this baseline reveals the exact fragmentation density and dictates the required snapping tolerance. Without it, downstream OSM Data Ingestion Pipelines will propagate geometric debt into production routing graphs.

Metric Projection & Spatial Normalization

Spatial snapping on raw latitude/longitude coordinates is mathematically invalid for meter-based tolerances due to meridian convergence. We project coordinates into a metric space using a transverse Mercator or Web Mercator transformation before indexing. A partitioned k-d tree then handles millions of nodes without exhausting heap memory. Each partition processes ~500,000 vertices in parallel. A 1.5-meter snapping tolerance merges proximate endpoints, balancing geometric fidelity against routing graph stability. The transformation executes entirely in-memory using PyArrow arrays and NumPy vectorization, avoiding pandas overhead.

import asyncio
import hashlib

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
from neo4j import AsyncGraphDatabase
from pyproj import Transformer
from scipy.spatial import cKDTree


class SpatialGraphETL:
    def __init__(self, uri: str, user: str, password: str, pool_size: int = 25):
        self.driver = AsyncGraphDatabase.driver(
            uri, auth=(user, password), max_connection_pool_size=pool_size
        )

    async def ingest_batch(self, batch_edges: list[dict]):
        """Transactional upsert with deterministic edge hashing."""
        query = """
        UNWIND $batch AS row
        MERGE (s:Node {id: row.source})
        MERGE (t:Node {id: row.target})
        MERGE (s)-[r:CONNECTS {hash: row.edge_hash}]->(t)
        ON CREATE SET r.weight = row.weight, r.surface = row.surface
        ON MATCH SET r.updated_at = timestamp()
        """
        async with self.driver.session() as session:
            await session.run(query, batch=batch_edges)

    async def close(self):
        await self.driver.close()


def normalize_topology_pyarrow(
    nodes_table: pa.Table,
    edges_table: pa.Table,
    tolerance_m: float = 1.5,
) -> pa.Table:
    """Snap proximate nodes via a metric k-d tree, then emit deduplicated edges
    that carry every attribute the Cypher upsert expects.

    ``nodes_table`` columns: ``node_id``, ``lat``, ``lon``.
    ``edges_table`` columns: ``source``, ``target``, ``weight``, ``surface``.
    Returns an Arrow table with columns: ``source``, ``target``, ``edge_hash``,
    ``weight``, ``surface``.
    """
    # 1. Project to metric space for accurate Euclidean distance
    transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True)
    x, y = transformer.transform(
        nodes_table.column("lon").to_numpy(),
        nodes_table.column("lat").to_numpy(),
    )

    # 2. k-d tree spatial index & proximity query
    tree = cKDTree(np.column_stack((x, y)))
    pairs = tree.query_pairs(r=tolerance_m, output_type="ndarray")

    # 3. Union-Find for connected component resolution
    parent = np.arange(len(nodes_table))

    def find(i: int) -> int:
        path = []
        while parent[i] != i:
            path.append(i)
            i = parent[i]
        for node in path:
            parent[node] = i
        return i

    def union(i: int, j: int) -> None:
        root_i, root_j = find(i), find(j)
        if root_i != root_j:
            parent[root_j] = root_i

    for i, j in pairs:
        union(int(i), int(j))

    # 4. Canonical id mapping (node row index -> canonical row index -> node_id)
    node_ids = nodes_table.column("node_id").to_numpy()
    id_index = {nid: idx for idx, nid in enumerate(node_ids)}
    canonical_for = np.array([node_ids[find(i)] for i in range(len(node_ids))])

    src_canonical = np.array([canonical_for[id_index[s]] for s in edges_table.column("source").to_numpy()])
    tgt_canonical = np.array([canonical_for[id_index[t]] for t in edges_table.column("target").to_numpy()])

    # 5. Drop self-loops introduced by snapping
    keep = src_canonical != tgt_canonical
    src_canonical = src_canonical[keep]
    tgt_canonical = tgt_canonical[keep]
    weights = edges_table.column("weight").to_numpy()[keep]
    surfaces = edges_table.column("surface").to_numpy(zero_copy_only=False)[keep]

    # 6. Deterministic, order-independent edge hash
    edge_hashes = np.array([
        hashlib.sha256(f"{min(s, t)}|{max(s, t)}".encode()).hexdigest()
        for s, t in zip(src_canonical, tgt_canonical)
    ])

    edges = pa.table({
        "source": src_canonical,
        "target": tgt_canonical,
        "edge_hash": edge_hashes,
        "weight": weights,
        "surface": surfaces,
    })

    # Arrow has no drop_duplicates(); group by the hash and keep the first row.
    grouped = edges.group_by("edge_hash").aggregate([
        ("source", "first"), ("target", "first"),
        ("weight", "first"), ("surface", "first"),
    ])
    return grouped.rename_columns(["edge_hash", "source", "target", "weight", "surface"])

Idempotent Ingestion & Async Execution

The deduplicated Arrow table streams directly into the graph driver. Full graph reloads are avoided by leveraging transactional upserts and connection pooling. The Cypher layer enforces strict schema constraints, while MERGE with deterministic property keys prevents phantom edges during concurrent writes. Async execution ensures non-blocking I/O during high-throughput ingestion, allowing the ETL to saturate network bandwidth without stalling the transformation pipeline.

// Idempotent edge ingestion with schema enforcement
UNWIND $batch AS row
MERGE (s:Node {id: row.source})
MERGE (t:Node {id: row.target})
MERGE (s)-[r:CONNECTS {hash: row.edge_hash}]->(t)
ON CREATE SET r.weight = row.weight, r.surface = row.surface
ON MATCH SET r.updated_at = timestamp()

Connection pooling is configured with max_connection_pool_size tuned to the database’s thread pool limit. Batches of 10,000–50,000 edges strike an optimal balance between transaction overhead and memory pressure. The Neo4j Async Driver handles backpressure automatically, but explicit asyncio.gather() with semaphore limits prevents connection exhaustion during regional-scale imports.

Scaling Trade-offs & Production Hardening

Spatial graph construction requires deliberate trade-offs between geometric precision and routing performance. A tighter snapping tolerance (≤0.5m) preserves curb-level accuracy but increases vertex count and adjacency matrix sparsity. A looser tolerance (≥3.0m) accelerates ingestion but introduces artificial shortcuts in pedestrian routing networks. For logistics applications, we recommend dynamic tolerance scaling based on road class: 1.0m for residential streets, 2.5m for motorways.

Memory constraints dictate partition sizing. The cKDTree implementation from SciPy consumes ~8–12 bytes per coordinate pair plus overhead. For continental-scale datasets, chunking by administrative boundaries or UTM zones prevents OOM failures. Validation should run post-ingestion: verify that the largest connected component contains ≥99.8% of routable edges, and audit isolated subgraphs for emergency bypass routing.

When integrated into broader Spatial Graph Construction & OSM Ingestion workflows, this pipeline guarantees deterministic graph topology. Attribute synchronization, POI enrichment, and async batch processing layers can safely attach to the normalized node/edge tables without risking topological drift. The result is a resilient, audit-ready routing substrate that scales from municipal micro-mobility networks to transcontinental freight corridors.