DAGs & Event Sourcing
Technical Reference
Part I: Directed Acyclic Graphs
Graph theory, algorithms, complexity analysis, and Python implementations
1. Graph Theory Primer
A graph G = (V, E) consists of a set of vertices (nodes) V and a set of edges E connecting pairs of vertices. Graphs are the mathematical structure behind networks, dependencies, hierarchies, and workflows.
Key Vocabulary
| Term | Definition | Notation |
|---|---|---|
| Vertex (node) | A point in the graph | v ∈ V |
| Edge | A connection between two vertices | (u, v) ∈ E |
| Directed edge | Edge with a direction: u → v (u is parent, v is child) | |
| Undirected edge | Edge with no direction: u — v | |
| In-degree | Number of edges pointing into a vertex | deg−(v) |
| Out-degree | Number of edges pointing out of a vertex | deg+(v) |
| Path | Sequence of vertices where each consecutive pair is connected by an edge | |
| Cycle | A path that starts and ends at the same vertex | |
| Adjacency list | For each vertex, a list of its neighbors. The standard representation. | |
| Root | A vertex with in-degree 0 (no parents) | |
| Leaf | A vertex with out-degree 0 (no children) |
Representation in Code
# Adjacency list — the standard graph representation
# Key = vertex, Value = list of vertices it points to
graph = {
"A": ["B", "C"], # A → B, A → C
"B": ["D"], # B → D
"C": ["D"], # C → D
"D": [], # D has no outgoing edges (leaf)
}
# |V| = 4 vertices, |E| = 4 edges
# In-degree: A=0, B=1, C=1, D=2
# Out-degree: A=2, B=1, C=1, D=0
2. What Makes a DAG
A DAG is a graph that satisfies two constraints:
- Directed — Every edge has a direction (u → v)
- Acyclic — There is no path from any vertex back to itself
The acyclic constraint is what makes DAGs useful. Because there are no cycles, you can always find an ordering where every vertex appears before all vertices it points to. This ordering — topological sort — is the foundation for dependency resolution, build systems, pricing engines, and workflow scheduling.
3. DAG vs Tree vs General Graph
| Property | Tree | DAG | General Directed Graph |
|---|---|---|---|
| Directed | Yes (parent → child) | Yes | Yes |
| Acyclic | Yes | Yes | No (may have cycles) |
| Max parents per node | Exactly 1 (except root: 0) | Any number | Any number |
| Shared nodes | No — each node has unique path from root | Yes — multiple paths to same node | Yes |
| Topological order exists | Yes (BFS = level order) | Yes | Only if no cycles |
| Use case | File systems, DOM, org charts | Dependencies, pricing, workflows | Social networks, web links |
| Duplication needed for sharing | Yes (must clone subtree) | No (share via multiple parents) | No |
4. Core Algorithms
4.1 Cycle Detection (Three-Color DFS) O(V + E)
Before any DAG operation, you must verify the graph is actually acyclic. The standard algorithm uses a three-color DFS marking:
- WHITE — Unvisited
- GRAY — Currently being explored (on the recursion stack)
- BLACK — Fully explored (all descendants visited)
If you encounter a GRAY node while exploring, you’ve found a back edge — a cycle.
def has_cycle(graph: dict[str, list[str]]) -> bool:
"""Detect cycles using three-color DFS. O(V + E)."""
WHITE, GRAY, BLACK = 0, 1, 2
color = {v: WHITE for v in graph}
def dfs(v: str) -> bool:
color[v] = GRAY # Mark: currently exploring
for neighbor in graph.get(v, []):
if color[neighbor] == GRAY: # Back edge → cycle!
return True
if color[neighbor] == WHITE:
if dfs(neighbor):
return True
color[v] = BLACK # Mark: fully explored
return False
return any(dfs(v) for v in graph if color[v] == WHITE)
# To find the actual cycle (not just detect):
def find_cycle(graph: dict[str, list[str]]) -> list[str] | None:
"""Return the cycle path, or None if acyclic."""
WHITE, GRAY, BLACK = 0, 1, 2
color = {v: WHITE for v in graph}
parent = {}
def dfs(v: str) -> str | None:
color[v] = GRAY
for neighbor in graph.get(v, []):
if color[neighbor] == GRAY:
# Reconstruct cycle
cycle = [neighbor, v]
node = v
while node != neighbor:
node = parent.get(node)
if node is None:
break
cycle.append(node)
cycle.reverse()
return cycle
if color[neighbor] == WHITE:
parent[neighbor] = v
result = dfs(neighbor)
if result:
return result
color[v] = BLACK
return None
for v in graph:
if color[v] == WHITE:
result = dfs(v)
if result:
return result
return None
4.2 Topological Sort — DFS-Based O(V + E)
Post-order DFS produces a reverse topological order. Reverse it to get the final ordering.
def topological_sort_dfs(graph: dict[str, list[str]]) -> list[str]:
"""DFS-based topological sort. Returns vertices in dependency order.
For every edge u → v, u appears BEFORE v in the result.
Raises ValueError if graph has a cycle.
"""
WHITE, GRAY, BLACK = 0, 1, 2
color = {v: WHITE for v in graph}
result = []
def dfs(v: str):
color[v] = GRAY
for neighbor in graph.get(v, []):
if color[neighbor] == GRAY:
raise ValueError(f"Cycle detected involving {v} → {neighbor}")
if color[neighbor] == WHITE:
dfs(neighbor)
color[v] = BLACK
result.append(v) # Post-order: append AFTER visiting all children
for v in graph:
if color[v] == WHITE:
dfs(v)
result.reverse() # Reverse post-order = topological order
return result
# Example:
# graph = {"A": ["B", "C"], "B": ["D"], "C": ["D"], "D": []}
# topological_sort_dfs(graph) → ["A", "B", "C", "D"] or ["A", "C", "B", "D"]
# Both are valid — topological orderings are not unique.
4.3 Topological Sort — Kahn’s Algorithm (BFS) O(V + E)
Kahn’s algorithm uses in-degree counting and a queue. Bonus: it detects cycles for free (if the output has fewer vertices than the graph, there’s a cycle).
from collections import deque
def topological_sort_kahn(graph: dict[str, list[str]]) -> list[str]:
"""BFS-based topological sort (Kahn's algorithm).
Returns vertices in dependency order.
Raises ValueError if graph has a cycle.
"""
# 1. Compute in-degree for every vertex
in_degree = {v: 0 for v in graph}
for v in graph:
for neighbor in graph[v]:
in_degree[neighbor] = in_degree.get(neighbor, 0) + 1
# 2. Enqueue all vertices with in-degree 0 (no dependencies)
queue = deque(v for v in graph if in_degree[v] == 0)
result = []
# 3. Process: dequeue, add to result, decrement neighbors' in-degree
while queue:
v = queue.popleft()
result.append(v)
for neighbor in graph[v]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor) # All dependencies satisfied
# 4. Cycle detection: if result is shorter than graph, cycle exists
if len(result) != len(graph):
raise ValueError("Cycle detected — not a DAG")
return result
4.4 Longest & Shortest Path in a DAG O(V + E)
Unlike general graphs, DAGs allow linear-time shortest and longest path computation by processing vertices in topological order:
def longest_path(graph: dict[str, list[tuple[str, int]]], start: str) -> dict[str, int]:
"""Find longest path from start to all reachable vertices.
graph: adjacency list with weights — {"A": [("B", 3), ("C", 2)], ...}
Returns: {vertex: longest_distance} from start.
Used in critical path scheduling (PERT/CPM).
"""
order = topological_sort_dfs({v: [n for n, _ in neighbors]
for v, neighbors in graph.items()})
dist = {v: float("-inf") for v in graph}
dist[start] = 0
for v in order:
if dist[v] == float("-inf"):
continue # Not reachable from start
for neighbor, weight in graph.get(v, []):
if dist[v] + weight > dist[neighbor]:
dist[neighbor] = dist[v] + weight
return {v: d for v, d in dist.items() if d != float("-inf")}
def shortest_path(graph: dict[str, list[tuple[str, int]]], start: str) -> dict[str, int]:
"""Same structure, just flip the comparison."""
order = topological_sort_dfs({v: [n for n, _ in neighbors]
for v, neighbors in graph.items()})
dist = {v: float("inf") for v in graph}
dist[start] = 0
for v in order:
if dist[v] == float("inf"):
continue
for neighbor, weight in graph.get(v, []):
if dist[v] + weight < dist[neighbor]:
dist[neighbor] = dist[v] + weight
return {v: d for v, d in dist.items() if d != float("inf")}
4.5 Transitive Closure & Reduction O(V · E)
Transitive closure: Add an edge u → v for every pair where v is reachable from u. Answers “can I get from A to B?” in O(1).
Transitive reduction: Remove redundant edges. If A → B and A → C → B, then A → B is redundant (you can still reach B through C). The reduction has the fewest edges while preserving all reachability.
def transitive_closure(graph: dict[str, list[str]]) -> dict[str, set[str]]:
"""Compute all pairs reachability. O(V * (V + E))."""
closure = {v: set() for v in graph}
for v in graph:
# BFS/DFS from each vertex
stack = [v]
visited = set()
while stack:
node = stack.pop()
for neighbor in graph.get(node, []):
if neighbor not in visited:
visited.add(neighbor)
closure[v].add(neighbor)
stack.append(neighbor)
return closure
def transitive_reduction(graph: dict[str, list[str]]) -> dict[str, list[str]]:
"""Remove redundant edges while preserving reachability. O(V * E)."""
closure = transitive_closure(graph)
reduced = {v: [] for v in graph}
for v in graph:
for neighbor in graph[v]:
# Keep edge v → neighbor only if no OTHER path exists
redundant = False
for other in graph[v]:
if other != neighbor and neighbor in closure.get(other, set()):
redundant = True
break
if not redundant:
reduced[v].append(neighbor)
return reduced
5. Dynamic Programming on DAGs
DAGs are the natural substrate for dynamic programming. Because there are no cycles, you can process vertices in topological order and guarantee that all dependencies are resolved before you compute a vertex’s value.
# General pattern: DP on a DAG
def dag_dp(graph, values, combine_fn):
"""
Compute a value for each vertex bottom-up.
- values: base value for each leaf node
- combine_fn(node_value, child_values) → combined value
"""
order = topological_sort_dfs(graph)
order.reverse() # Bottom-up: leaves first, root last
result = {}
for v in order:
children = graph.get(v, [])
if not children:
result[v] = values[v] # Leaf: use base value
else:
child_vals = [result[c] for c in children]
result[v] = combine_fn(values[v], child_vals) # Combine
return result
# Example: pricing rollup
# combine_fn = lambda self_cost, child_costs: self_cost + sum(child_costs)
This is exactly how the pricing engine works: bottom-up traversal where each node’s total cost = its own cost + sum of children’s costs.
6. The Diamond Dependency Problem
When multiple paths converge on the same node, that node could be computed multiple times:
In a tree, this can’t happen (each node has one parent). In a DAG, it’s common and expected.
Solution: Memoization
def evaluate_with_memo(graph, node, eval_fn, memo=None):
"""Evaluate a DAG node with memoization for shared nodes."""
if memo is None:
memo = {}
if node in memo:
return memo[node] # Already computed — return cached
children = graph.get(node, [])
child_results = [evaluate_with_memo(graph, c, eval_fn, memo) for c in children]
result = eval_fn(node, child_results)
memo[node] = result # Cache for other paths
return result
7. Serialization Formats
| Format | Structure | Pros | Cons |
|---|---|---|---|
| Flat map | {id: {children: [...], data: ...}} | O(1) lookup, DAG-native, no duplication | Must follow references to traverse |
| Adjacency list | {vertex: [neighbors]} | Standard, compact | Node data stored separately |
| Edge list | [(from, to), ...] | Simplest, good for bulk loading | O(E) to find neighbors |
| Nested JSON | {data: ..., children: [{...}]} | Human-readable tree | Duplicates shared nodes, tree-only |
For database storage, flat map with one row per node is the right choice (see the integration deep dive for PostgreSQL schema).
8. Complete Python Implementation
"""Complete DAG library — pure functions, no I/O, no dependencies."""
from dataclasses import dataclass
from collections import deque
from typing import Any, Callable
@dataclass(frozen=True)
class Node:
id: str
data: dict
children: tuple[str, ...] = ()
@dataclass(frozen=True)
class DAG:
nodes: dict # {str: Node}
root: str
# ── Construction ──────────────────────────────────────────
def add_node(dag: DAG, node: Node) -> DAG:
"""Add a node. Returns new DAG (immutable)."""
new_nodes = dict(dag.nodes)
new_nodes[node.id] = node
return DAG(nodes=new_nodes, root=dag.root)
def connect(dag: DAG, parent_id: str, child_id: str) -> DAG:
"""Add edge parent → child. Returns new DAG."""
parent = dag.nodes[parent_id]
if child_id in parent.children:
return dag # Already connected
new_parent = Node(
id=parent.id,
data=parent.data,
children=parent.children + (child_id,),
)
new_nodes = dict(dag.nodes)
new_nodes[parent_id] = new_parent
new_dag = DAG(nodes=new_nodes, root=dag.root)
# Validate: must still be acyclic
cycle = find_cycle_in_dag(new_dag)
if cycle:
raise ValueError(f"Adding edge {parent_id} → {child_id} creates cycle: {cycle}")
return new_dag
# ── Validation ────────────────────────────────────────────
def find_cycle_in_dag(dag: DAG) -> list[str] | None:
"""Return cycle path or None if valid DAG."""
adj = {nid: list(n.children) for nid, n in dag.nodes.items()}
WHITE, GRAY, BLACK = 0, 1, 2
color = {v: WHITE for v in adj}
parent = {}
def dfs(v):
color[v] = GRAY
for nb in adj.get(v, []):
if color.get(nb, WHITE) == GRAY:
cycle = [nb, v]
node = v
while node != nb:
node = parent.get(node)
if node is None: break
cycle.append(node)
cycle.reverse()
return cycle
if color.get(nb, WHITE) == WHITE:
parent[nb] = v
result = dfs(nb)
if result: return result
color[v] = BLACK
return None
for v in adj:
if color[v] == WHITE:
r = dfs(v)
if r: return r
return None
def validate(dag: DAG) -> list[str]:
"""Return list of errors (empty = valid)."""
errors = []
if dag.root not in dag.nodes:
errors.append(f"Root '{dag.root}' not found in nodes")
for nid, node in dag.nodes.items():
for cid in node.children:
if cid not in dag.nodes:
errors.append(f"Node '{nid}' references missing child '{cid}'")
cycle = find_cycle_in_dag(dag)
if cycle:
errors.append(f"Cycle detected: {' → '.join(cycle)}")
return errors
# ── Traversal ─────────────────────────────────────────────
def topological_sort(dag: DAG) -> list[str]:
"""Return node IDs in topological order (parents before children)."""
adj = {nid: list(n.children) for nid, n in dag.nodes.items()}
in_deg = {v: 0 for v in adj}
for v in adj:
for nb in adj[v]:
in_deg[nb] = in_deg.get(nb, 0) + 1
queue = deque(v for v in adj if in_deg[v] == 0)
order = []
while queue:
v = queue.popleft()
order.append(v)
for nb in adj[v]:
in_deg[nb] -= 1
if in_deg[nb] == 0:
queue.append(nb)
if len(order) != len(adj):
raise ValueError("Cycle detected")
return order
def reverse_topological_sort(dag: DAG) -> list[str]:
"""Return node IDs bottom-up (leaves first, root last)."""
return list(reversed(topological_sort(dag)))
def evaluate_bottom_up(dag: DAG, eval_fn: Callable) -> dict[str, Any]:
"""Bottom-up evaluation with memoization.
eval_fn(node: Node, child_results: list[Any]) -> Any
"""
order = reverse_topological_sort(dag)
memo = {}
for nid in order:
node = dag.nodes[nid]
child_results = [memo[cid] for cid in node.children]
memo[nid] = eval_fn(node, child_results)
return memo
def ancestors(dag: DAG, node_id: str) -> set[str]:
"""All nodes that can reach node_id by following children."""
# Build reverse adjacency
reverse = {nid: [] for nid in dag.nodes}
for nid, node in dag.nodes.items():
for cid in node.children:
reverse[cid].append(nid)
visited = set()
stack = [node_id]
while stack:
v = stack.pop()
for parent in reverse.get(v, []):
if parent not in visited:
visited.add(parent)
stack.append(parent)
return visited
def descendants(dag: DAG, node_id: str) -> set[str]:
"""All nodes reachable from node_id by following children."""
visited = set()
stack = list(dag.nodes[node_id].children)
while stack:
v = stack.pop()
if v not in visited:
visited.add(v)
stack.extend(dag.nodes[v].children)
return visited
Part II: Event Sourcing
Patterns, internals, evolution strategies, testing, anti-patterns, and Python implementations
9. The Core Concept
Event sourcing stores state as a sequence of immutable events rather than a mutable current-state record. The current state is derived by replaying the event sequence.
The Fundamental Equation
current_state = fold(apply_event, initial_state, events)
# Or equivalently in Python:
from functools import reduce
state = reduce(apply_event, events, InitialState())
Where apply_event(state, event) → new_state is a pure function. This is the single most important thing to understand: the state transition function must be pure, deterministic, and side-effect-free.
10. Event Store Internals
Streams
Events are grouped into streams. A stream is an ordered sequence of events for a single aggregate (entity). Stream IDs are typically formatted as type:id — e.g., job:abc-123, product:def-456.
Versioning and Optimistic Concurrency
Each event in a stream has a monotonically increasing version number. When appending, you specify the expected version. If another writer has appended first, you get a concurrency conflict:
# Optimistic concurrency control:
# 1. Read stream → current version is 5
# 2. Compute new events
# 3. Append with expected_version=5
# 4. If someone else appended first (version is now 6):
# → CONFLICT — retry from step 1
def append_events(stream_id, events, expected_version):
"""Append events to a stream with optimistic concurrency."""
# In SQL: INSERT INTO events (stream_id, version, ...) VALUES (...)
# The UNIQUE(stream_id, version) constraint enforces this.
# If expected_version + 1 is already taken → unique violation → retry.
pass
Global Position vs Stream Version
| Concept | Scope | Use |
|---|---|---|
| Stream version | Per-stream (e.g., job:abc = v1, v2, v3) | Optimistic concurrency, aggregate replay |
| Global position | Across all streams (monotonic) | Projection catch-up, “process all events since position X” |
11. Aggregates and Boundaries
An aggregate is a consistency boundary. All events in one stream belong to one aggregate. The aggregate is the unit of transactional consistency.
Rules for Aggregate Design
- One stream per aggregate instance — stream
job:abc-123contains all events for that one job - Commands target one aggregate — a command either succeeds and produces events for one stream, or fails entirely
- Cross-aggregate operations use sagas/process managers (see Section 16)
- Keep aggregates small — fewer events per stream = faster replay = less contention
12. Snapshots
When streams get long (hundreds of events), replaying from the beginning becomes slow. Snapshots cache the aggregate state at a point in time:
# Without snapshots:
state = reduce(apply_event, all_500_events, InitialState())
# With snapshots:
snapshot = load_snapshot(stream_id) # State at version 480
remaining = load_events(stream_id, after=480) # Events 481-500
state = reduce(apply_event, remaining, snapshot.state)
# Only 20 events to replay instead of 500
Snapshot Strategies
| Strategy | When to Snapshot | Trade-off |
|---|---|---|
| Every N events | After every 100 events | Simple but arbitrary. May snapshot in the middle of a workflow. |
| Business boundaries | After QuoteApproved, after JobCompleted | Semantically meaningful. Recommended. |
| Time-based | Nightly, end of shift | Good for operational systems. Snapshot state matches a real point in time. |
| On demand | When replay exceeds a threshold | Adaptive but adds read-time latency on the first slow replay. |
13. Schema Evolution and Upcasting
Events are immutable. Once written, they never change. But your code evolves — event shapes need to change over time. This is the hardest operational problem in event sourcing.
Three Strategies
1. Upcasting (recommended)
Transform old event formats to the current format at read time. The stored event never changes.
# Event v1 (original):
{"event_type": "QuoteGenerated", "price": 1234}
# Event v2 (new: price split into subtotal + tax):
{"event_type": "QuoteGenerated", "subtotal": 1100, "tax": 134}
# Upcaster: transform v1 → v2 on read
def upcast_quote_generated(event: dict) -> dict:
if "price" in event["payload"] and "subtotal" not in event["payload"]:
# v1 → v2: assume 10% tax rate for historical events
price = event["payload"]["price"]
event = {**event, "payload": {
**event["payload"],
"subtotal": round(price / 1.1, 2),
"tax": round(price - price / 1.1, 2),
}}
del event["payload"]["price"]
return event
# Register upcasters by event type
UPCASTERS = {
"QuoteGenerated": [upcast_quote_generated],
}
def load_events_with_upcasting(stream_id):
raw_events = load_raw_events(stream_id)
return [apply_upcasters(e) for e in raw_events]
def apply_upcasters(event):
for upcaster in UPCASTERS.get(event["event_type"], []):
event = upcaster(event)
return event
2. Weak Schema (flexible payloads)
Use permissive JSONB payloads. New fields are added; old fields are never removed. Consumers ignore unknown fields. Simple but can accumulate cruft over time.
3. Stream Migration (nuclear option)
Replay the entire stream through upcasters and write to a new stream. The old stream is archived. Use only when upcasting is too complex.
- Never remove fields from events — add new fields alongside old ones
- Never rename event types — add a new type and upcast the old one
- Always version your events from day one (even just
schema_version: 1) - Test upcasters against real historical data, not just synthetic examples
14. Projections In Depth
A projection is a denormalized read model built by processing events. Each projection is optimized for a specific query pattern.
Synchronous vs Asynchronous Projections
| Type | When Updated | Consistency | Use Case |
|---|---|---|---|
| Synchronous | In the same transaction as the event write | Strongly consistent | Critical reads that must reflect latest state |
| Asynchronous | After event write, via background processor or trigger | Eventually consistent (1–5s lag) | Dashboards, reports, search indexes |
Projection Lifecycle
class Projection:
"""Base pattern for all projections."""
def __init__(self):
self.position = 0 # Last processed global position
def handle(self, event: dict):
"""Route event to the correct handler."""
handler = getattr(self, f"on_{event['event_type']}", None)
if handler:
handler(event)
self.position = event["global_position"]
def rebuild(self, all_events: list[dict]):
"""Delete current state and replay all events from scratch."""
self.reset() # Truncate the projection table
self.position = 0
for event in all_events:
self.handle(event)
def catch_up(self, new_events: list[dict]):
"""Process only events since last known position."""
for event in new_events:
if event["global_position"] > self.position:
self.handle(event)
class JobBoardProjection(Projection):
"""Denormalized view of all jobs with current status."""
def on_JobCreated(self, event):
# INSERT INTO proj_job_board (job_id, status, customer) VALUES (...)
pass
def on_QuoteApproved(self, event):
# UPDATE proj_job_board SET status = 'approved' WHERE job_id = ...
pass
def on_StageCompleted(self, event):
# UPDATE proj_job_board SET status = ..., current_station = ... WHERE ...
pass
15. CQRS: The Full Pattern
Command Query Responsibility Segregation — separate the write model (commands → events) from the read model (projections).
Command Handler Pattern
def handle_command(command: dict, stream_events: list[dict]) -> list[dict]:
"""Pure function: given current state + command, return new events.
1. Rebuild current state from events
2. Validate command against current state
3. If valid, produce new events
4. If invalid, return rejection event or raise
"""
state = rebuild_state(stream_events)
# Validation (pure — no I/O)
errors = validate_command(state, command)
if errors:
return [{"event_type": "CommandRejected",
"payload": {"errors": errors, "command": command["type"]}}]
# Decision logic (pure — no I/O)
match command["type"]:
case "ApproveQuote":
return [{"event_type": "QuoteApproved",
"payload": {"approved_by": command["approver"],
"price": state.quoted_price}}]
case "MoveToStation":
return [{"event_type": "MovedToStation",
"payload": {"station": command["station"],
"previous": state.current_station}}]
case _:
return [{"event_type": "CommandRejected",
"payload": {"errors": [f"Unknown command: {command['type']}"]}}]
16. Sagas and Process Managers
When an operation spans multiple aggregates (e.g., “when a quote is approved, start production AND generate an invoice”), you need coordination across streams. Two patterns:
Choreography (decentralized)
Each aggregate reacts to events from other aggregates. No central coordinator.
# QuoteApproved event is published
# → Production aggregate listens, creates ProductionStarted event
# → Billing aggregate listens, creates InvoiceGenerated event
# No coordinator — each reacts independently
Orchestration (centralized Process Manager)
A process manager maintains state and issues commands to aggregates:
class OrderFulfillmentProcess:
"""Coordinates the multi-step order fulfillment workflow."""
def on_QuoteApproved(self, event):
return [
Command("StartProduction", job_id=event.job_id),
Command("GenerateInvoice", job_id=event.job_id),
]
def on_ProductionCompleted(self, event):
if self.invoice_paid:
return [Command("ShipOrder", job_id=event.job_id)]
# else: wait for payment
def on_InvoicePaid(self, event):
if self.production_complete:
return [Command("ShipOrder", job_id=event.job_id)]
# else: wait for production
17. Idempotency
Event delivery is typically at-least-once. A projection or handler may receive the same event twice (network retry, consumer restart). Handlers must be idempotent — processing the same event twice produces the same result.
Strategies
| Strategy | How | Use Case |
|---|---|---|
| Natural idempotency | SET status = 'approved' is naturally idempotent — setting it twice does nothing | Most projection updates |
| Deduplication table | Store processed event IDs. Skip if already seen. | Side effects (emails, webhooks) |
| Version checking | UPDATE ... SET x = y WHERE version = expected_version | Aggregate state updates |
| Upsert | INSERT ... ON CONFLICT DO UPDATE | Projection table maintenance |
# Deduplication example:
processed_events = set()
def handle_idempotent(event):
if event["event_id"] in processed_events:
return # Already processed — skip
do_work(event)
processed_events.add(event["event_id"])
18. Testing Event-Sourced Systems
Event sourcing has a natural testing pattern: Given / When / Then
def test_approve_quote():
"""Given a created job with a quote, when we approve, then QuoteApproved is emitted."""
# GIVEN: existing events (the history)
given_events = [
{"event_type": "JobCreated", "payload": {"job_id": "j1", "customer": "Acme"}},
{"event_type": "QuoteGenerated", "payload": {"job_id": "j1", "price": 1234.50}},
]
# WHEN: a command is issued
command = {"type": "ApproveQuote", "job_id": "j1", "approver": "john"}
# THEN: new events are produced
new_events = handle_command(command, given_events)
assert len(new_events) == 1
assert new_events[0]["event_type"] == "QuoteApproved"
assert new_events[0]["payload"]["approved_by"] == "john"
def test_cannot_approve_without_quote():
"""Given a job with no quote, approval should be rejected."""
given_events = [
{"event_type": "JobCreated", "payload": {"job_id": "j1", "customer": "Acme"}},
]
command = {"type": "ApproveQuote", "job_id": "j1", "approver": "john"}
new_events = handle_command(command, given_events)
assert new_events[0]["event_type"] == "CommandRejected"
assert "no quote" in new_events[0]["payload"]["errors"][0].lower()
def test_projection_rebuild():
"""Projections must produce identical state whether built incrementally or from scratch."""
events = [
{"event_type": "JobCreated", "payload": {"job_id": "j1"}, "global_position": 1},
{"event_type": "QuoteApproved", "payload": {"job_id": "j1"}, "global_position": 2},
]
# Incremental
proj_a = JobBoardProjection()
for e in events:
proj_a.handle(e)
# Full rebuild
proj_b = JobBoardProjection()
proj_b.rebuild(events)
assert proj_a.get_state() == proj_b.get_state()
19. Anti-Patterns and Pitfalls
| Anti-Pattern | Why It’s Wrong | Correct Approach |
|---|---|---|
| Leaking internal events as public API | Internal state-change events (like FieldUpdated) tightly couple consumers to your aggregate internals. Any refactoring breaks downstream. |
Publish separate integration events (coarse-grained, stable contract) for external consumers. Keep internal events private to the aggregate. |
CRUD events (JobUpdated) |
Events like JobUpdated(fields: {status: "printing"}) contain no domain meaning. Why did it change? Who decided? What happened? |
Use domain events: QuoteApproved, MovedToStation, PrintStarted. Each captures a meaningful business occurrence. |
| Giant aggregates | Putting too much in one aggregate creates long event streams (slow replay) and high contention (concurrent commands conflict). | Keep aggregates focused on one consistency boundary. Use process managers for cross-aggregate coordination. |
| Querying the event store directly | Scanning event streams for queries is O(n) per stream and requires replaying to reconstruct state. | Use projections for all read queries. The event store is for writes and replay only. |
| Mutating events | Changing historical events destroys the audit trail and breaks any consumer that already processed the original. | Events are immutable. Use compensating events to correct mistakes (e.g., QuoteRevised to override a previous QuoteGenerated). |
| No event versioning | Without schema versions, you can’t upcast old events when the format changes. You’re stuck. | Add schema_version to every event from day one. Build upcasters before you need them. |
| Synchronous projections for everything | Updating all projections in the same transaction as the event write adds latency and can fail the write if a projection has a bug. | Default to async projections. Only use synchronous for the rare case where the read must be immediately consistent with the write. |
| Event sourcing everything | Not all data benefits from event sourcing. Reference data (customers, rate tables) changes rarely and has no meaningful event history. | Event-source aggregates with rich lifecycles (jobs, orders). Use plain CRUD for reference data. |
20. Complete Python Implementation
"""Complete event sourcing library — pure functions, no I/O, no dependencies."""
from dataclasses import dataclass, field
from typing import Callable
from functools import reduce
import uuid
from datetime import datetime, timezone
# ── Domain Events ─────────────────────────────────────────
@dataclass(frozen=True)
class DomainEvent:
event_id: str
stream_id: str
version: int
event_type: str
timestamp: str
payload: dict
metadata: dict = field(default_factory=dict)
schema_version: int = 1
@staticmethod
def create(stream_id: str, version: int, event_type: str,
payload: dict, metadata: dict = None) -> "DomainEvent":
return DomainEvent(
event_id=str(uuid.uuid4()),
stream_id=stream_id,
version=version,
event_type=event_type,
timestamp=datetime.now(timezone.utc).isoformat(),
payload=payload,
metadata=metadata or {},
)
# ── In-Memory Event Store ─────────────────────────────────
class InMemoryEventStore:
"""Minimal event store for testing and development."""
def __init__(self):
self._streams: dict[str, list[DomainEvent]] = {}
self._global_log: list[DomainEvent] = []
def append(self, stream_id: str, events: list[DomainEvent],
expected_version: int) -> int:
"""Append events with optimistic concurrency."""
stream = self._streams.get(stream_id, [])
current_version = len(stream)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but stream is at {current_version}"
)
for i, event in enumerate(events):
versioned = DomainEvent(
event_id=event.event_id,
stream_id=stream_id,
version=current_version + i + 1,
event_type=event.event_type,
timestamp=event.timestamp,
payload=event.payload,
metadata=event.metadata,
schema_version=event.schema_version,
)
stream.append(versioned)
self._global_log.append(versioned)
self._streams[stream_id] = stream
return current_version + len(events)
def load_stream(self, stream_id: str,
after_version: int = 0) -> list[DomainEvent]:
"""Load events from a stream, optionally after a version."""
stream = self._streams.get(stream_id, [])
return [e for e in stream if e.version > after_version]
def load_all(self, after_position: int = 0) -> list[DomainEvent]:
"""Load all events across all streams (for projections)."""
return self._global_log[after_position:]
class ConcurrencyError(Exception):
pass
# ── Aggregate Base ────────────────────────────────────────
class Aggregate:
"""Base class for event-sourced aggregates."""
def __init__(self):
self._version = 0
self._pending_events: list[DomainEvent] = []
@property
def version(self) -> int:
return self._version
@property
def pending_events(self) -> list[DomainEvent]:
return list(self._pending_events)
def load_from_history(self, events: list[DomainEvent]):
"""Replay events to rebuild state."""
for event in events:
self._apply(event)
self._version = event.version
def _raise_event(self, event_type: str, payload: dict):
"""Record a new event (to be persisted later)."""
event = DomainEvent.create(
stream_id="", # Set by repository
version=self._version + len(self._pending_events) + 1,
event_type=event_type,
payload=payload,
)
self._pending_events.append(event)
self._apply(event)
def _apply(self, event: DomainEvent):
"""Route event to handler. Override in subclasses."""
handler = getattr(self, f"_on_{event.event_type}", None)
if handler:
handler(event.payload)
def clear_pending(self):
self._version += len(self._pending_events)
self._pending_events.clear()
# ── Example: Job Aggregate ────────────────────────────────
class Job(Aggregate):
"""A production job — event-sourced aggregate."""
def __init__(self):
super().__init__()
self.job_id: str = ""
self.customer: str = ""
self.status: str = "empty"
self.quoted_price: float | None = None
self.current_station: str | None = None
# ── Commands (produce events) ──
def create(self, job_id: str, customer: str):
if self.status != "empty":
raise ValueError("Job already created")
self._raise_event("JobCreated", {
"job_id": job_id, "customer": customer,
})
def generate_quote(self, price: float):
if self.status != "created":
raise ValueError(f"Cannot quote in status '{self.status}'")
self._raise_event("QuoteGenerated", {
"job_id": self.job_id, "price": price,
})
def approve_quote(self, approver: str):
if self.status != "quoted":
raise ValueError(f"Cannot approve in status '{self.status}'")
self._raise_event("QuoteApproved", {
"job_id": self.job_id, "approved_by": approver,
"price": self.quoted_price,
})
def move_to_station(self, station: str, operator: str):
if self.status not in ("approved", "in_production"):
raise ValueError(f"Cannot move in status '{self.status}'")
self._raise_event("MovedToStation", {
"job_id": self.job_id, "station": station,
"operator": operator, "previous_station": self.current_station,
})
# ── Event handlers (apply state changes) ──
def _on_JobCreated(self, payload):
self.job_id = payload["job_id"]
self.customer = payload["customer"]
self.status = "created"
def _on_QuoteGenerated(self, payload):
self.quoted_price = payload["price"]
self.status = "quoted"
def _on_QuoteApproved(self, payload):
self.status = "approved"
def _on_MovedToStation(self, payload):
self.current_station = payload["station"]
self.status = "in_production"
# ── Repository ────────────────────────────────────────────
class JobRepository:
"""Load and save Job aggregates via the event store."""
def __init__(self, store: InMemoryEventStore):
self._store = store
def load(self, job_id: str) -> Job:
stream_id = f"job:{job_id}"
events = self._store.load_stream(stream_id)
job = Job()
job.load_from_history(events)
return job
def save(self, job: Job):
stream_id = f"job:{job.job_id}"
events = job.pending_events
self._store.append(stream_id, events, job.version)
job.clear_pending()
# ── Snapshot Support ──────────────────────────────────────
@dataclass(frozen=True)
class Snapshot:
stream_id: str
version: int
state: dict
created_at: str
class SnapshotStore:
def __init__(self):
self._snapshots: dict[str, Snapshot] = {}
def save(self, aggregate: Aggregate, stream_id: str):
self._snapshots[stream_id] = Snapshot(
stream_id=stream_id,
version=aggregate.version,
state=aggregate.__dict__.copy(),
created_at=datetime.now(timezone.utc).isoformat(),
)
def load(self, stream_id: str) -> Snapshot | None:
return self._snapshots.get(stream_id)
# ── Usage Example ─────────────────────────────────────────
if __name__ == "__main__":
store = InMemoryEventStore()
repo = JobRepository(store)
# Create a job
job = Job()
job.create("j-001", "Acme Cards")
job.generate_quote(1234.50)
repo.save(job)
# Later: load and approve
job = repo.load("j-001")
print(f"Status: {job.status}, Price: {job.quoted_price}")
# → Status: quoted, Price: 1234.50
job.approve_quote("john.smith")
job.move_to_station("heidelberg-xl-106", "maria.garcia")
repo.save(job)
# Replay: load again from events
job2 = repo.load("j-001")
print(f"Status: {job2.status}, Station: {job2.current_station}")
# → Status: in_production, Station: heidelberg-xl-106
# Full event history
for event in store.load_stream("job:j-001"):
print(f" v{event.version}: {event.event_type}")
Part III: Building It From Scratch
Why the internet misleads, what’s actually hard, what’s not, and how to build a DAG + event sourcing system with an AI agent
21. Why “DAG” Search Results Mislead
If you search for “DAG” in 2026, roughly 95% of results point to Apache Airflow, Prefect, Dagster, or Luigi. These are workflow orchestration tools that use DAGs to schedule task execution. This creates a massive discoverability problem: the data structure itself — one of the most fundamental and useful structures in computer science — is buried under framework marketing.
This happens because:
- Airflow dominates SEO. Airflow’s documentation, blog posts, and tutorials outnumber pure CS content by orders of magnitude. Google’s ranking rewards volume.
- AI tools inherit the bias. Perplexity, ChatGPT, and other AI assistants are trained on web corpora. When asked about “DAGs for an ERP,” they pattern-match to Airflow content and produce answers about task scheduling, retries, distributed queues, and executor pools — none of which apply to using a DAG as a data model.
- The CS fundamentals are “boring.” Nobody writes blog posts about adjacency lists and topological sort because the algorithms have been solved for 60 years. There’s no venture capital funding “DAG-as-a-data-structure.”
22. Workflow DAG vs Data-Structure DAG
These share a name and a mathematical definition, but they solve completely different problems:
| Dimension | Workflow DAG (Airflow, Prefect) | Data-Structure DAG (Your Use Case) |
|---|---|---|
| What nodes represent | Functions / tasks to execute | Components, materials, processes, products |
| What edges mean | “Run this before that” | “This contains / requires that” |
| Runtime behavior | Scheduler dispatches tasks to workers | Engine traverses graph to compute values |
| Failure mode | Task crashes mid-execution, need retry | Invalid graph (cycle), need validation |
| Concurrency concern | Parallel task execution across machines | Concurrent graph edits by users |
| State management | Track running / success / failed per task | Store graph structure in database |
| Scaling challenge | 1000s of tasks across distributed workers | Large graphs with many shared nodes |
| UI requirement | Monitor running tasks, logs, alerts | Edit product configuration visually |
| Infrastructure | Redis, Celery, Kubernetes, message queues | One Postgres database |
The Perplexity Trap
When AI tools are asked about “DAGs for ERP/MIS,” they conflate these two uses and produce alarming complexity assessments. A typical AI-generated response will warn about:
| AI’s Warning | Applies to Workflow DAGs | Applies to Your Data DAGs |
|---|---|---|
| Retries / partial task failures | Yes — tasks crash mid-run | No — you’re reading a graph, not executing tasks |
| Distributed queues / Redis locks | Yes — parallel task workers | No — single Postgres query loads the graph |
| Custom schedulers / slot exhaustion | Yes — when to run what | No — nothing to schedule |
| Observability / monitoring UI | Yes — monitor running tasks | No — UI shows product config, not task status |
| Idempotency | Yes — task re-execution | Later — only if you add event sourcing |
| Cycle detection | Yes | Yes — this one actually applies |
| Topological sort | Yes — task execution order | Yes — pricing rollup order |
Two out of seven concerns actually apply. Both are solved in ~80 lines of Python (see Sections 4.1 and 4.2).
23. Actual Code Size
A production-quality DAG library for product modeling breaks down to roughly 330 lines of pure Python:
| Component | Lines | What It Does |
|---|---|---|
| Graph structure (Node, DAG, add/remove/connect) | ~150 | Immutable data classes, construction functions |
| Cycle detection (three-color DFS) | ~50 | Validates that the graph is acyclic |
| Topological sort (Kahn’s) | ~40 | Determines correct evaluation order |
| Bottom-up pricing evaluation | ~60 | Traverses leaves-to-root with memoization |
| Serialization (to/from JSON) | ~30 | Converts DAG to flat map for Supabase storage |
| Total | ~330 |
For comparison:
- Apache Airflow core: ~500,000 lines
- Prefect core: ~100,000 lines
- Your DAG data structure: ~330 lines
This 1000x size difference is because those tools solve a fundamentally harder problem (distributed task execution). You’re solving a data modeling problem — adjacency lists and recursive traversal. The complete implementation is already in Section 8 of this guide.
24. Layer-by-Layer Difficulty
Building the full system involves layers beyond the core DAG. Here’s an honest assessment of each:
| Layer | Difficulty | Lines | Why |
|---|---|---|---|
| Core DAG library | Easy | ~330 | Well-understood CS. Pure functions. Easy to test. No dependencies. An AI agent can write and test this in one session. |
| Persistence (Supabase/Postgres) | Medium | ~200 | Schema design matters (one row per node, JSONB for attributes). TOAST compression gotchas with large JSONB. Migrations. But it’s standard SQL — no novel algorithms. |
| Pricing engine traversal | Medium | ~150 | Bottom-up evaluation with memoization (Section 5–6). Diamond dependencies require care. But the algorithm is well-known and testable with zero I/O. |
| Event sourcing layer | Medium–Hard | ~400 | Versioning, optimistic concurrency, snapshots, upcasting. Conceptually elegant but operationally tricky. Schema evolution is the hardest part (Section 13). |
| CQRS + projections | Medium–Hard | ~300 | Separate write and read models. Projection rebuild logic. Eventually-consistent UIs. Requires discipline to keep projections disposable. |
| UI for editing graphs | Hard | ~2000+ | Visual node editor with drag-and-drop, real-time validation, undo/redo. This is a frontend project unto itself. Libraries like React Flow help but still require significant integration. |
| Multi-user concurrency | Hard | Varies | Two users editing the same product graph simultaneously. Optimistic locking handles simple cases; real-time collaboration (Google Docs style) is a major engineering effort. |
25. Mapping to Soft Code Architecture
If you’re using the Soft Code file structure (as in tool-hub-try5), every layer maps cleanly:
Key observations:
core/has zero I/O. The DAG library, cycle detection, topological sort, and pricing engine are all pure functions. They take data in and return data out. No database, no network, no file system.adapters/handles all persistence. Loading a graph from Supabase, saving events, managing connections — all isolated from business logic.- The orchestrator is ~15 lines. Load graph → validate → compute price → format output. No branching. No I/O. Just function calls in sequence.
- Tests are trivial. Feed a graph dictionary to
cycle_detection(), assert it returns the expected result. No mocking, no database setup, no test fixtures.
26. What an AI Agent Build Session Looks Like
Here’s what building the product_graph tool with an AI agent actually involves — step by step, following the Soft Code workflow:
Session 1: Core DAG (~1 hour)
- DECISIONS.md — List the decisions: What node types exist? Can any node connect to any other? How is pricing calculated? What happens with diamond dependencies?
- models.py — Define
Node,DAG,PricingResultas frozen dataclasses - decision_logic.py —
has_cycle(),can_connect(),validate_graph() - workflow.py —
topological_sort(),evaluate_bottom_up(),price_product() - Tests — Happy path, cycle detection, diamond dependencies, empty graph, single node
- Run preflight.py — Verify all Soft Code constraints pass
Session 2: Persistence (~1 hour)
- SQL schema —
config_nodestable with JSONB attributes - io.py —
load_graph(product_id)andsave_graph(dag) - Integration tests — Round-trip: create graph → save → load → verify identical
Session 3: Pricing Engine (~1 hour)
- Pricing functions — Define per-node-type pricing (material cost, process cost, markup)
- Bottom-up traversal — Wire
evaluate_bottom_up()to real pricing functions - Tests — Known product configurations with hand-calculated expected prices
Session 4: Event Sourcing (optional, ~2 hours)
- Event definitions —
NodeAdded,NodeRemoved,EdgeCreated,PriceCalculated - Event store — Append-only table, optimistic concurrency
- Aggregate —
ProductGraphaggregate that rebuilds from events - Tests — Given/When/Then pattern (Section 18)
Total: ~5 hours of AI-assisted work for the complete system. The AI agent writes the code; you review and make the business decisions (what node types, what pricing rules, what edge constraints). The hard part is the decisions, not the implementation.
27. Event Sourcing From Scratch: Same Story
Event sourcing has the same discoverability problem as DAGs. Most content online covers:
- EventStoreDB — a dedicated event store database (powerful but heavy infrastructure)
- Axon Framework — Java enterprise event sourcing (massive, complex)
- Kafka + event streaming — distributed messaging (solves a different problem entirely)
For a single-tenant ERP/MIS on Supabase Postgres, you need none of these. Event sourcing is fundamentally simple:
# The entire concept in 4 lines:
events_table = "INSERT INTO events (stream_id, version, type, payload) VALUES (...)"
load_stream = "SELECT * FROM events WHERE stream_id = ? ORDER BY version"
current_state = reduce(apply_event, load_stream(id), InitialState())
# That's it. Everything else is optimization.
The complete implementation is in Section 20 (~270 lines). The complexity drivers are:
| Concern | When It Matters | Can You Defer It? |
|---|---|---|
| Optimistic concurrency | Multiple users editing simultaneously | Yes — start single-user |
| Snapshots | Streams exceed ~500 events | Yes — won’t happen for months |
| Schema evolution / upcasting | When you change event shapes | No — add schema_version from day one |
| Projections | When you need fast reads | Yes — start with direct replay |
| Sagas / process managers | Cross-aggregate workflows | Yes — start with single aggregates |
schema_version: 1 to every event from day one. This costs nothing and saves you from the single worst event sourcing pitfall: being unable to evolve event formats later (Section 13).
28. The “From Scratch” Advantage
Building DAG + event sourcing from scratch (rather than adopting a framework) gives you specific advantages:
1. No Airflow Baggage
Airflow’s DAG abstraction is designed for task scheduling. It forces you to express everything as “operators” with retry policies, execution dates, and scheduling intervals. None of this applies to product modeling. If you used Airflow, you’d spend more time working around the framework than building your system.
2. Domain-Specific Node Types
Your DAG nodes aren’t generic tasks — they’re Material, Process, Component, Product. Each has different attributes, different pricing functions, different validation rules. A generic graph library would require you to bolt these on; building from scratch means they’re native.
3. Pure Function Testability
Every function in your core library is pure: data in, data out. No database connections, no network calls, no file I/O. This means tests run in milliseconds, require no setup, and never flake. A framework would inject its own runtime, making tests slower and more fragile.
4. Perfect Fit for Soft Code
The 330-line library slots directly into the Soft Code file structure. models.py holds data shapes. decision_logic.py holds graph validation. workflow.py holds traversal. Each file has one job. A framework would violate this separation — framework code and business code inevitably interleave.
5. No Dependency Risk
Zero external dependencies means zero supply chain risk, zero breaking changes from upstream, and zero compatibility issues with Python version upgrades. The code you write today will work identically in 5 years.
The counterargument (and when it’s valid)
The legitimate case for a framework is when you need distributed execution: running tasks across multiple machines, handling hardware failures, scaling to millions of operations. If you ever need that, use Temporal, Airflow, or Prefect. But that’s workflow orchestration, not product modeling.
29. Build Order and Dependencies
The layers have a natural dependency order. Build bottom-up, shipping each layer before starting the next:
The critical path
Phase 0 → Phase 1 → Phase 2 is the minimum viable system. A product can be modeled as a DAG, stored in Supabase, and priced with a single API call. Everything after Phase 2 adds operational sophistication but doesn’t change the core capability.
Start with a tree, upgrade to a DAG
A pragmatic migration path:
- Start with
parent_id(tree) — simplest schema, covers 80% of products - When you hit a shared component, upgrade to
parents[](DAG) — the algorithms are backward-compatible because every tree is already a valid DAG - When you need audit trails, add event sourcing — wrap the existing DAG operations in events
Each step is additive. No rewrites. No migrations that break existing data.
30. Complete Proof-of-Concept: Product Graph Tool
Here’s a complete, self-contained product graph tool that models a trading card product, prices it via bottom-up DAG traversal, and demonstrates every concept from Parts I–III. This is what the core/product_graph tool would look like in the Soft Code structure:
"""
product_graph — Proof of concept
Models a trading card product as a DAG, prices it bottom-up.
Pure functions. Zero I/O. Zero dependencies beyond stdlib.
"""
from dataclasses import dataclass, field
from collections import deque
from typing import Any
# ── Domain Models ─────────────────────────────────────────
@dataclass(frozen=True)
class ConfigNode:
"""A single node in the product configuration DAG."""
id: str
node_type: str # "material", "process", "component", "product"
name: str
attributes: dict = field(default_factory=dict)
children: tuple[str, ...] = ()
base_cost: float = 0.0 # Direct cost of this node
pricing_fn: str = "sum_children" # How to aggregate child costs
@dataclass(frozen=True)
class ProductGraph:
"""An immutable product configuration DAG."""
nodes: dict # {str: ConfigNode}
root_id: str
@dataclass(frozen=True)
class PricingResult:
"""Output of pricing a product graph."""
total_cost: float
node_costs: dict # {node_id: cost}
traversal_order: list # Bottom-up evaluation order
# ── Graph Construction (immutable — returns new graphs) ───
def add_node(graph: ProductGraph, node: ConfigNode) -> ProductGraph:
nodes = dict(graph.nodes)
nodes[node.id] = node
return ProductGraph(nodes=nodes, root_id=graph.root_id)
def connect(graph: ProductGraph, parent_id: str, child_id: str) -> ProductGraph:
parent = graph.nodes[parent_id]
if child_id in parent.children:
return graph
updated = ConfigNode(
id=parent.id, node_type=parent.node_type, name=parent.name,
attributes=parent.attributes,
children=parent.children + (child_id,),
base_cost=parent.base_cost, pricing_fn=parent.pricing_fn,
)
nodes = dict(graph.nodes)
nodes[parent_id] = updated
new_graph = ProductGraph(nodes=nodes, root_id=graph.root_id)
if has_cycle(new_graph):
raise ValueError(f"Edge {parent_id} → {child_id} creates a cycle")
return new_graph
# ── Validation ────────────────────────────────────────────
def has_cycle(graph: ProductGraph) -> bool:
WHITE, GRAY, BLACK = 0, 1, 2
color = {nid: WHITE for nid in graph.nodes}
def dfs(v):
color[v] = GRAY
for child in graph.nodes[v].children:
if color.get(child) == GRAY:
return True
if color.get(child) == WHITE and dfs(child):
return True
color[v] = BLACK
return False
return any(dfs(v) for v in graph.nodes if color[v] == WHITE)
def validate(graph: ProductGraph) -> list[str]:
errors = []
if graph.root_id not in graph.nodes:
errors.append(f"Root '{graph.root_id}' not in nodes")
for nid, node in graph.nodes.items():
for cid in node.children:
if cid not in graph.nodes:
errors.append(f"Node '{nid}' references missing child '{cid}'")
if has_cycle(graph):
errors.append("Graph contains a cycle")
return errors
# ── Traversal ─────────────────────────────────────────────
def topological_sort(graph: ProductGraph) -> list[str]:
in_deg = {nid: 0 for nid in graph.nodes}
for node in graph.nodes.values():
for cid in node.children:
in_deg[cid] = in_deg.get(cid, 0) + 1
queue = deque(nid for nid in graph.nodes if in_deg[nid] == 0)
order = []
while queue:
v = queue.popleft()
order.append(v)
for cid in graph.nodes[v].children:
in_deg[cid] -= 1
if in_deg[cid] == 0:
queue.append(cid)
if len(order) != len(graph.nodes):
raise ValueError("Cycle detected")
return order
# ── Pricing Engine ────────────────────────────────────────
PRICING_FUNCTIONS = {
"sum_children": lambda self_cost, child_costs: self_cost + sum(child_costs),
"max_children": lambda self_cost, child_costs: self_cost + max(child_costs, default=0),
"markup_15": lambda self_cost, child_costs: (self_cost + sum(child_costs)) * 1.15,
}
def price_product(graph: ProductGraph) -> PricingResult:
"""Bottom-up pricing with memoization. Handles diamond dependencies correctly."""
errors = validate(graph)
if errors:
raise ValueError(f"Invalid graph: {errors}")
order = list(reversed(topological_sort(graph))) # Leaves first
costs = {}
for nid in order:
node = graph.nodes[nid]
child_costs = [costs[cid] for cid in node.children]
fn = PRICING_FUNCTIONS.get(node.pricing_fn, PRICING_FUNCTIONS["sum_children"])
costs[nid] = fn(node.base_cost, child_costs)
return PricingResult(
total_cost=costs[graph.root_id],
node_costs=costs,
traversal_order=order,
)
# ── Serialization (for database storage) ──────────────────
def to_flat_map(graph: ProductGraph) -> dict:
"""Convert to JSON-serializable flat map (one entry per node)."""
return {
"root_id": graph.root_id,
"nodes": {
nid: {
"id": node.id,
"node_type": node.node_type,
"name": node.name,
"attributes": node.attributes,
"children": list(node.children),
"base_cost": node.base_cost,
"pricing_fn": node.pricing_fn,
}
for nid, node in graph.nodes.items()
},
}
def from_flat_map(data: dict) -> ProductGraph:
"""Reconstruct a ProductGraph from a flat map."""
nodes = {}
for nid, ndata in data["nodes"].items():
nodes[nid] = ConfigNode(
id=ndata["id"],
node_type=ndata["node_type"],
name=ndata["name"],
attributes=ndata.get("attributes", {}),
children=tuple(ndata.get("children", [])),
base_cost=ndata.get("base_cost", 0.0),
pricing_fn=ndata.get("pricing_fn", "sum_children"),
)
return ProductGraph(nodes=nodes, root_id=data["root_id"])
# ══════════════════════════════════════════════════════════
# DEMO: Model and price a trading card product
# ══════════════════════════════════════════════════════════
if __name__ == "__main__":
# Build the product DAG
#
# TradingCardOrder (product, markup_15)
# ├── CardSheet (component)
# │ ├── Cardstock (material, $0.12)
# │ ├── FrontPrint (process, $0.08)
# │ ├── BackPrint (process, $0.05)
# │ └── Lamination (process, $0.03) ← SHARED (diamond dependency)
# ├── Packaging (component)
# │ ├── Box (material, $0.25)
# │ └── Shrinkwrap (process, $0.02)
# └── Lamination (process, $0.03) ← SHARED (same node, not a copy)
nodes = {
"order": ConfigNode("order", "product", "Trading Card Order", pricing_fn="markup_15"),
"card_sheet": ConfigNode("card_sheet", "component", "Card Sheet"),
"packaging": ConfigNode("packaging", "component", "Packaging"),
"cardstock": ConfigNode("cardstock", "material", "Cardstock 300gsm", base_cost=0.12),
"front": ConfigNode("front", "process", "Front Print CMYK", base_cost=0.08),
"back": ConfigNode("back", "process", "Back Print 1-color", base_cost=0.05),
"lamination": ConfigNode("lamination", "process", "Gloss Lamination", base_cost=0.03),
"box": ConfigNode("box", "material", "Display Box", base_cost=0.25),
"shrinkwrap": ConfigNode("shrinkwrap", "process", "Shrinkwrap", base_cost=0.02),
}
g = ProductGraph(nodes=nodes, root_id="order")
# Connect edges (building the DAG)
g = connect(g, "order", "card_sheet")
g = connect(g, "order", "packaging")
g = connect(g, "order", "lamination") # Lamination shared at order level
g = connect(g, "card_sheet", "cardstock")
g = connect(g, "card_sheet", "front")
g = connect(g, "card_sheet", "back")
g = connect(g, "card_sheet", "lamination") # Same lamination node — diamond dependency!
g = connect(g, "packaging", "box")
g = connect(g, "packaging", "shrinkwrap")
# Validate
errors = validate(g)
print(f"Validation: {'PASS' if not errors else errors}")
# Price it
result = price_product(g)
print(f"\nTraversal order (bottom-up): {result.traversal_order}")
print(f"\nPer-node costs:")
for nid in result.traversal_order:
node = g.nodes[nid]
print(f" {node.name:<25} ${result.node_costs[nid]:.4f}")
print(f"\n{'='*50}")
print(f" TOTAL (with 15% markup): ${result.total_cost:.4f}")
# Key insight: Lamination ($0.03) appears in the costs ONCE,
# even though both card_sheet and order reference it.
# The bottom-up traversal with topological sort handles this
# automatically — each node is evaluated exactly once.
# Serialize round-trip
flat = to_flat_map(g)
restored = from_flat_map(flat)
result2 = price_product(restored)
assert result.total_cost == result2.total_cost
print(f"\n Serialization round-trip: PASS")
Expected Output
Validation: PASS
Traversal order (bottom-up): ['cardstock', 'front', 'back', 'lamination', 'box', 'shrinkwrap', 'card_sheet', 'packaging', 'order']
Per-node costs:
Cardstock 300gsm $0.1200
Front Print CMYK $0.0800
Back Print 1-color $0.0500
Gloss Lamination $0.0300
Display Box $0.2500
Shrinkwrap $0.0200
Card Sheet $0.2800
Packaging $0.2700
Trading Card Order $0.6670
==================================================
TOTAL (with 15% markup): $0.6670
Serialization round-trip: PASS
- Lamination appears once in the cost breakdown ($0.03), even though
card_sheetandorderboth reference it. Topological sort ensures it’s evaluated exactly once. - Card Sheet = cardstock ($0.12) + front ($0.08) + back ($0.05) + lamination ($0.03) = $0.28. Correct.
- Packaging = box ($0.25) + shrinkwrap ($0.02) = $0.27. Correct.
- Order = (card_sheet ($0.28) + packaging ($0.27) + lamination ($0.03)) × 1.15 = $0.667. Correct.
- 330 lines. Zero dependencies. Zero I/O. Full test coverage possible with plain asserts.
DAGs & Event Sourcing — Technical Reference · March 2026
Pure theory, algorithms, patterns, working Python code, and build-from-scratch guide