Node and Edge Spatial Mapping
Node and Edge Spatial Mapping transforms raw coordinate streams into deterministic, traversable graph topology. In production logistics, mobility, and spatial analytics platforms, this layer dictates routing accuracy, constraint enforcement, and query latency. Without strict geometry-to-topology conversion rules, spatial predicates degrade into expensive full-table scans and non-deterministic pathfinding.
The architecture of this process aligns with established Spatial Graph Database Fundamentals for Python principles. Engineers must treat spatial coordinates as immutable anchors while deriving connectivity through explicit geometric relationships. Real-world networks rarely arrive as clean adjacency lists; they require snapping tolerance enforcement, intersection detection, and rigorous topology validation before ingestion.
Coordinate Reference System Normalization
Coordinate reference system (CRS) normalization is a mandatory prerequisite for graph construction. While WGS84 (EPSG:4326) remains the ingestion standard for global datasets, geographic coordinates introduce angular distortion that corrupts distance and routing calculations. You must project coordinates to a planar system before computing edge metrics. Regional deployments benefit from UTM zones or local state plane projections, while continental routing typically uses Web Mercator (EPSG:3857) or custom equal-area projections.
Projection pipelines should stream features rather than materializing entire datasets in memory. Generator-based processing prevents out-of-memory failures during large-scale network imports and enables backpressure-aware ingestion.
import asyncio
import json
from pathlib import Path
from typing import AsyncIterator, Dict, Any
from shapely.geometry import LineString, shape
from shapely.ops import transform
from pyproj import Transformer
from pyproj.geod import Geod
# Geodetic calculator for accurate metric edge lengths
geod = Geod(ellps="WGS84")
def build_crs_transformer(
from_crs: str = "EPSG:4326", to_crs: str = "EPSG:3857"
) -> Transformer:
return Transformer.from_crs(from_crs, to_crs, always_xy=True)
async def stream_normalized_geometries(
geojson_path: Path,
transformer: Transformer,
) -> AsyncIterator[Dict[str, Any]]:
"""Yield edge dicts derived from a newline-delimited GeoJSON LineString file.
Each yielded dict has stable keys for direct Cypher ingestion:
``edge_id``, ``source_id``, ``target_id``,
``src_lat``, ``src_lon``, ``tgt_lat``, ``tgt_lon``,
``length_m``, ``speed_limit``, ``zone_id``.
"""
with open(geojson_path, "r") as f:
for line in f:
if not line.strip():
continue
feature = json.loads(line)
geom = shape(feature["geometry"])
if not geom.is_valid:
geom = geom.buffer(0) # Self-intersection repair
if not isinstance(geom, LineString):
continue
# Geodesic length over the original WGS84 coords
coords = list(geom.coords) # [(lon, lat), ...]
length_m = sum(
geod.inv(p1[0], p1[1], p2[0], p2[1])[2]
for p1, p2 in zip(coords[:-1], coords[1:])
)
src_lon, src_lat = coords[0]
tgt_lon, tgt_lat = coords[-1]
props = feature["properties"]
yield {
"edge_id": props["id"],
"source_id": props["source_id"],
"target_id": props["target_id"],
"src_lat": src_lat, "src_lon": src_lon,
"tgt_lat": tgt_lat, "tgt_lon": tgt_lon,
"length_m": round(length_m, 3),
"speed_limit": props.get("speed_kmh", 50),
"zone_id": props.get("zone_id"),
}
Topology Construction and Intersection Detection
Raw road segments and transit corridors require precise node extraction at geometric intersections. Naive pairwise comparison scales quadratically (O(N²)) and becomes untenable beyond ~50k segments. Production systems implement spatial hashing or grid partitioning to accelerate candidate matching. This approach directly complements proven Spatial Indexing Strategies that reduce candidate sets before topology materialization.
Node deduplication must account for floating-point precision drift. Apply a deterministic snapping tolerance (typically 0.5m to 2.0m depending on data source accuracy) before merging intersection points. Coordinates that fall within the tolerance radius collapse into a single graph node, preserving planar topology while preventing phantom edges.
The diagram below shows the same physical crossroads expressed first as raw GIS linestrings and then as the directed graph that emerges after intersection extraction and snapping. The grey rectangle is the bounding box used by spatial-index pre-filters; the labelled nodes are the canonical vertices a routing algorithm actually traverses.
flowchart LR
subgraph GEOM["Linestrings (input)"]
direction LR
L1["Way A: (lon,lat) ..."]
L2["Way B: (lon,lat) ..."]
L3["Way C: (lon,lat) ..."]
end
subgraph GRAPH["Directed graph (output)"]
direction LR
N1((n1)) -- "edge e1<br/>length, speed" --> N2((n2))
N2 -- "e2" --> N3((n3))
N2 -- "e3" --> N4((n4))
N4 -- "e4" --> N3
end
GEOM --> Norm["Quantise + snap<br/>tolerance ~0.5 m"]
Norm --> Split["Split at intersections<br/>extract atomic edges"]
Split --> GRAPH
classDef raw fill:#fbfaf6,stroke:#cdc6b3,color:#455062;
classDef step fill:#f6f0e6,stroke:#b58900,color:#5b3a0d;
classDef graph fill:#e9f5f4,stroke:#0e7c86,color:#0e7c86;
class L1,L2,L3 raw
class Norm,Split step
class N1,N2,N3,N4 graph
Async Streaming Pipeline with Connection Pooling
Modern spatial ETL requires non-blocking I/O and connection pooling to saturate graph database throughput without exhausting heap memory. The following pattern demonstrates an async pipeline that batches normalized geometries, manages a persistent connection pool, and enforces transactional boundaries.
import asyncio
from neo4j import AsyncGraphDatabase
from typing import AsyncGenerator, List, Dict
class SpatialGraphIngestor:
def __init__(self, uri: str, user: str, password: str, pool_size: int = 8):
self.driver = AsyncGraphDatabase.driver(
uri, auth=(user, password), max_connection_pool_size=pool_size
)
async def close(self):
await self.driver.close()
async def ingest_batch(self, batch: List[Dict]) -> None:
"""Execute parameterized Cypher with spatial properties.
Expects each row to carry the fields emitted by
``stream_normalized_geometries``: edge endpoints plus their coordinates,
so we can MERGE both intersections and the connecting segment in one pass.
"""
query = """
UNWIND $batch AS row
MERGE (s:Intersection {id: row.source_id})
ON CREATE SET s.location = point({latitude: row.src_lat, longitude: row.src_lon}),
s.zone = row.zone_id
MERGE (t:Intersection {id: row.target_id})
ON CREATE SET t.location = point({latitude: row.tgt_lat, longitude: row.tgt_lon}),
t.zone = row.zone_id
MERGE (s)-[e:ROAD_SEGMENT {id: row.edge_id}]->(t)
SET e.length_m = row.length_m,
e.speed_limit = row.speed_limit
"""
async with self.driver.session(database="neo4j") as session:
await session.run(query, batch=batch)
async def run_pipeline(
self,
geojson_path: Path,
batch_size: int = 10_000
) -> None:
transformer = build_crs_transformer()
batch: List[Dict] = []
async for geom_data in stream_normalized_geometries(geojson_path, transformer):
batch.append(geom_data)
if len(batch) >= batch_size:
await self.ingest_batch(batch)
batch.clear()
if batch:
await self.ingest_batch(batch)
Bulk Ingestion and Query Execution
Graph databases execute Cypher statements transactionally. Bulk inserts require careful batching to avoid transaction log exhaustion and lock contention. Each batch should contain 5,000 to 20,000 records depending on available JVM heap and disk I/O throughput. Parameterized batches prevent Cypher query plan cache thrashing and ensure predictable memory consumption.
When querying mapped topology, spatial predicates must align with planner expectations. The Graph Query Planner Optimization framework demonstrates how spatial filters interact with index scans and expand operations. Always bind spatial constraints to indexed properties before executing MATCH or CALL routing procedures.
// One-time projection of the routing topology into GDS
CALL gds.graph.project(
'routing_graph',
'Intersection',
{
ROAD_SEGMENT: {
type: 'ROAD_SEGMENT',
properties: ['length_m', 'speed_limit']
}
},
{ nodeProperties: ['zone'] }
)
YIELD graphName;
// Per-request shortest path against the projected graph
MATCH (src:Intersection {id: $source_id})
MATCH (tgt:Intersection {id: $target_id})
CALL gds.shortestPath.dijkstra.stream('routing_graph', {
sourceNode: src,
targetNode: tgt,
relationshipWeightProperty: 'length_m'
})
YIELD totalCost, path
RETURN totalCost, path
Production Scaling and Planner Considerations
Deterministic mapping standards prevent topology drift across incremental updates. When networks evolve, edge replacements must preserve node identifiers to avoid orphaned subgraphs and index fragmentation. Spatial scoping and multi-tenancy boundaries should be enforced at the ingestion layer, tagging nodes and edges with tenant identifiers before graph projection.
Performance trade-offs are inherent to spatial graph construction:
- Projection Overhead: Planar transformations accelerate distance math but add CPU cycles during ETL. Cache transformed geometries when reprocessing static networks.
- Snapping Tolerance: Aggressive tolerances reduce node count but introduce routing artifacts. Validate snapped topology against ground-truth GPS traces.
- Batch Sizing: Larger batches reduce round-trip latency but increase transaction log pressure. Monitor heap usage and adjust dynamically based on
neo4j.metricsor equivalent telemetry.
For comprehensive guidance on structuring transportation networks, refer to How to Map Road Networks to Graph Nodes and Edges. Properly mapped topology ensures that downstream routing algorithms, spatial analytics, and constraint solvers operate on mathematically sound foundations.