How to Map Road Networks to Graph Nodes and Edges
Routing engines degrade when spatial topology is approximated rather than explicitly modeled. The core engineering challenge lies in converting raw GIS linestrings into a deterministic, directed graph. This transformation demands precise intersection snapping, directional weight calculation, and rigorous topology validation. Understanding how to map road networks to graph nodes and edges eliminates routing artifacts, phantom intersections, and planner ambiguity before they reach production environments.
Raw road segments rarely align at exact coordinate boundaries. Floating-point drift, survey inaccuracies, and multi-source data ingestion create overlapping geometries that break shortest-path algorithms. The solution requires decomposing raw linestrings into atomic edges, resolving every physical intersection to a single canonical node, and attaching directional constraints exclusively to edges. This strict separation ensures the graph query planner operates on unambiguous topology.
1. Coordinate Quantization & Geometry Normalization
Spatial graphs fail when vertices are treated as continuous floating-point values. A 15-decimal coordinate difference between two theoretically identical intersections will spawn duplicate nodes, fragmenting the graph and inflating traversal costs. The first step is deterministic coordinate quantization.
We round all coordinates to a fixed precision (typically 6 decimal places for meter-scale accuracy) and apply topological cleaning. shapely’s buffer(0) idiom resolves self-intersections and invalid ring orientations, while simplify() removes redundant collinear vertices without altering geometric topology. This normalization phase is non-negotiable for production pipelines.
2. Atomic Edge Decomposition & Intersection Snapping
Road networks must be treated as planar graphs. When two linestrings cross, they must be split at the exact intersection point. This requires computing the unary union boundary, extracting valid intersection geometries, and splitting each source segment at every valid crossing.
The resulting fragments become atomic edges. Each edge retains its original metadata (road class, speed limit, surface type) but is now topologically isolated. Directional attributes like one-way restrictions, turn prohibitions, and time-dependent weights attach to the edge, not the vertex. This architecture prevents the planner from misinterpreting bidirectional edges during Dijkstra or A* execution.
3. Directed Graph Construction with Spatial Weights
Once geometries are split, we map endpoints to canonical node IDs and compute traversal costs. For lat/lon datasets, Euclidean distance introduces systematic error. We apply the Haversine formula to compute geodesic edge weights, ensuring routing costs reflect real-world travel distances.
import math
from typing import Dict, List, Tuple
import geopandas as gpd
import networkx as nx
from shapely.geometry import LineString, MultiLineString, MultiPoint, Point
from shapely.ops import split, unary_union
EARTH_RADIUS_M = 6_371_000
def haversine_distance(p1: Point, p2: Point) -> float:
"""Compute geodesic distance between two lat/lon points in meters.
Inputs follow Shapely's (x, y) convention, i.e. (longitude, latitude).
"""
lat1, lon1 = math.radians(p1.y), math.radians(p1.x)
lat2, lon2 = math.radians(p2.y), math.radians(p2.x)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
return 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))
def quantize_coords(geom: LineString, precision: int = 6) -> LineString:
"""Round coordinates to fixed precision to prevent floating-point drift."""
rounded = [(round(x, precision), round(y, precision)) for x, y in geom.coords]
return LineString(rounded)
def _extract_intersection_points(geoms: List[LineString]) -> List[Point]:
"""Pairwise intersect each linestring with the rest, keeping only true crossings.
Using ``union.intersection(union)`` is a no-op (any geometry intersected with
itself is itself); the correct planar approach is the pairwise intersection
of the combined boundary with itself via segmentation, but for road networks
it is cheaper to compare each segment against the union of the others.
"""
points: List[Point] = []
full_union = unary_union(geoms)
for g in geoms:
others = full_union.difference(g)
crossing = g.intersection(others)
if crossing.is_empty:
continue
if isinstance(crossing, Point):
points.append(crossing)
elif isinstance(crossing, MultiPoint):
points.extend(crossing.geoms)
# LineString overlaps (collinear segments) are ignored: they will be
# snapped via endpoint quantisation, not intersected.
return points
def build_spatial_graph(gdf_roads: gpd.GeoDataFrame, precision: int = 6) -> nx.DiGraph:
"""Convert raw road linestrings into a directed spatial graph with geodesic weights."""
gdf = gdf_roads.copy()
gdf["geometry"] = gdf.geometry.apply(
lambda g: quantize_coords(g, precision).buffer(0).simplify(1e-5)
)
split_pts = _extract_intersection_points(list(gdf.geometry))
G = nx.DiGraph()
node_id_map: Dict[Tuple[float, float], int] = {}
next_id = 0
for _, row in gdf.iterrows():
line = row.geometry
# Split this line at every interior intersection point that lies on it.
segments: List[LineString] = [line]
for pt in split_pts:
new_segments: List[LineString] = []
for seg in segments:
if seg.distance(pt) < 1e-6:
result = split(seg, pt)
new_segments.extend(result.geoms if isinstance(result, MultiLineString) else [result])
else:
new_segments.append(seg)
segments = new_segments
for seg in segments:
start = Point(seg.coords[0])
end = Point(seg.coords[-1])
for pt in (start, end):
coord_key = (round(pt.x, precision), round(pt.y, precision))
if coord_key not in node_id_map:
node_id_map[coord_key] = next_id
G.add_node(next_id, lat=pt.y, lon=pt.x)
next_id += 1
u = node_id_map[(round(start.x, precision), round(start.y, precision))]
v = node_id_map[(round(end.x, precision), round(end.y, precision))]
weight_m = haversine_distance(start, end)
is_oneway = str(row.get("oneway", "no")).lower() in ("yes", "1", "true")
G.add_edge(u, v, weight=weight_m, oneway=is_oneway, length_m=weight_m)
if not is_oneway:
G.add_edge(v, u, weight=weight_m, oneway=False, length_m=weight_m)
return G
4. Async Ingestion & Connection Pooling
In-memory graph construction scales poorly beyond ~500k edges. Production systems stream atomic edges into a spatial graph database using async drivers and connection pooling. The following pattern batches node/edge creation via Cypher, leveraging MERGE for idempotency and explicit transaction boundaries to prevent lock contention.
import asyncio
from neo4j import AsyncGraphDatabase, AsyncSession
from neo4j.exceptions import Neo4jError
from typing import List, Dict, Any
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "secure_password")
BATCH_SIZE = 5000
async def ingest_graph_batch(session: AsyncSession, batch_nodes: List[Dict], batch_edges: List[Dict]):
"""Batch ingest nodes and edges with spatial indexing hints."""
node_cypher = """
UNWIND $nodes AS n
MERGE (v:RoadNode {id: n.id})
SET v.lat = n.lat, v.lon = n.lon
"""
edge_cypher = """
UNWIND $edges AS e
MATCH (u:RoadNode {id: e.u}), (v:RoadNode {id: e.v})
MERGE (u)-[r:CONNECTS {direction: e.dir}]->(v)
SET r.weight = e.weight, r.length_m = e.length_m, r.oneway = e.oneway
"""
await session.run(node_cypher, nodes=batch_nodes)
await session.run(edge_cypher, edges=batch_edges)
async def stream_to_graph_db(G: nx.DiGraph, uri: str, auth: tuple):
driver = AsyncGraphDatabase.driver(uri, auth=auth, max_connection_pool_size=20)
async with driver.session() as session:
# Create spatial index upfront for query planner optimization
await session.run("CREATE INDEX road_node_spatial IF NOT EXISTS FOR (n:RoadNode) ON (n.lat, n.lon)")
nodes = [{"id": n, "lat": d["lat"], "lon": d["lon"]} for n, d in G.nodes(data=True)]
edges = [{"u": u, "v": v, "weight": d["weight"], "length_m": d["length_m"],
"oneway": d["oneway"], "dir": f"{u}_{v}"} for u, v, d in G.edges(data=True)]
for i in range(0, len(nodes), BATCH_SIZE):
await ingest_graph_batch(session, nodes[i:i+BATCH_SIZE], edges[i:i+BATCH_SIZE])
await driver.close()
5. Topology Validation & Production Diagnostics
Spatial graphs rarely load cleanly. Incomplete survey data, disconnected cul-de-sacs, and malformed OSM tags generate dangling edges that trap routing algorithms. Run degree centrality checks immediately post-ingestion to isolate vertices with exactly one incident edge. Filter or flag these based on business logic (e.g., dead-ends vs. data errors).
def diagnose_topology(G: nx.DiGraph) -> Dict[str, Any]:
degrees = dict(G.degree())
dangling = [n for n, d in degrees.items() if d == 1]
isolated = [n for n, d in degrees.items() if d == 0]
return {
"total_nodes": G.number_of_nodes(),
"total_edges": G.number_of_edges(),
"dangling_nodes": len(dangling),
"isolated_nodes": len(isolated),
"avg_degree": sum(degrees.values()) / len(degrees) if degrees else 0
}
Scaling & Performance Trade-offs
- Memory vs. Streaming:
networkxloads the entire topology into RAM. For continental-scale networks, switch toigraphor stream directly to a distributed graph engine using chunked async batches. - Index Fragmentation: Frequent
MERGEoperations on spatial coordinates cause B-tree fragmentation. Drop and re-create the affected indexes during a maintenance window (e.g.DROP INDEX road_node_spatial; CREATE INDEX road_node_spatial FOR (n:RoadNode) ON (n.lat, n.lon);) to reclaim planner efficiency. - Query Planner Optimization: Always supply explicit spatial bounds in routing queries. Unbounded
MATCHtraversals force full-graph scans. UseWHERE n.lat BETWEEN $min_lat AND $max_latto leverage spatial scoping and multi-tenancy boundaries. - Weight Calibration: Geodesic distance alone ignores traffic, elevation, and turn penalties. Augment edge weights with dynamic cost functions post-ingestion, but keep the base topology static to avoid index thrashing.
Mastering this pipeline ensures your routing layer operates on mathematically sound, query-optimized topology. When spatial mapping is deterministic, the graph query planner executes shortest-path algorithms without heuristic fallbacks, delivering sub-50ms latency even under high-concurrency mobility workloads. For deeper architectural patterns, consult the Spatial Graph Database Fundamentals for Python pillar documentation.