DAG Event Sourcing × Tool Hub
Enterprise ERP MIS Deep Dive
1. Executive Summary
This document is the complete architectural reference for evolving tool-hub-try5 — a repository of 12 stateless, pure-function tools built with Soft Code discipline — into an enterprise-grade ERP MIS capable of modeling composable products, millisecond quoting, full production tracking, and eventually agentic AI orchestration.
The transformation strategy combines three architectural patterns:
- DAG Product Model — Products are directed acyclic graphs of typed nodes, not flat database rows. A trading card order with UV coating, security foil, and multiple layers becomes a graph of interconnected configuration nodes.
- Event Sourcing — Every state change is recorded as an immutable event. Instead of overwriting a job’s status field, you append
JobCreated,NormalizationCompleted,SheetImposedevents. Current state is derived by replaying the event stream. - Supabase Postgres + JSONB — Managed PostgreSQL with JSONB for flexible product configuration storage, relational tables for stable schemas (rate tables, customers), and Row Level Security for multi-tenant isolation.
core/, I/O in adapters/, dumb orchestrators, frozen dataclasses — is already aligned with event sourcing. Decisions are pure functions. Domain models are immutable. Orchestrators are sequence-only. The DAG and event layers are additive, not a rewrite.
2. The Problem Statement
What We Have
Twelve independent, stateless tools. Each takes input, produces output, forgets everything. They normalize spreadsheets, match art to RFQs, build imposition layouts, split PDFs, generate labels, and calculate quotes. Each is rigorously structured: frozen dataclasses, pure decision functions, zero-branching orchestrators, thin Flask adapters.
What an Enterprise MIS Needs
| Capability | Current State | Required State |
|---|---|---|
| Product modeling | Each tool has its own flat data shapes | Unified composable product graph (DAG) |
| Quoting | Single-shot order_quote tool | Millisecond DAG-traversal pricing engine |
| State persistence | None — stateless request/response | Full job lifecycle with audit trail |
| Tool composition | Independent tools, manual chaining | DAG-defined workflows: tool A → tool B → tool C |
| Production tracking | None | Event-sourced station tracking, operator assignment |
| Agentic orchestration | None | AI agent navigating workflow DAGs, deciding next actions |
| Multi-tenant | Single-user Flask apps | Row-level security, customer isolation |
The Core Gap
The gap is not in computation — your pure functions are excellent. The gap is in persistent structure: a shared domain model that connects tools, tracks state over time, and enables composition. DAG event sourcing provides that structure without compromising the existing architecture.
3. Current Architecture: Tool Hub Try5
3.1 The Soft Code Discipline
Every function in this codebase must either make a decision (pure function: data in → data out) or perform plumbing (I/O: read files, write to network). Never both. This rule is enforced by 40+ automated validation scripts.
3.2 Canonical Tool Structure
core/<tool_name>/
├── DECISIONS.md # Business decisions (what, not how)
├── contracts.py # ToolInput, ToolOutput, run()
├── domain/
│ ├── models.py # Frozen dataclasses — plain data only
│ ├── specs.py # Constants, CONTRACTS_VERSION
│ └── schema_checklist.md # Field types, defaults, error contract
├── examples/
│ ├── happy_path.json # Valid input
│ └── invalid_path.json # Error case
├── application/
│ ├── input_validation.py # Payload → domain objects
│ ├── decision_logic.py # Pure functions — the intellectual core
│ ├── output_mapping.py # Domain objects → result dict
│ └── orchestrator.py # Dumb sequencer — max 60 lines, ZERO branching
└── tests/
└── test_decision_logic.py # Pure function tests, no I/O
3.3 What Exists Today (12 Tools)
| Tool | Purpose | Origin | Adapter |
|---|---|---|---|
sample_tool | Minimal reference example | New | — |
order_quote | Pricing calculation | New | — |
label_generator | Avery 5160 label layouts | Migrated | — |
imposition_tool | 13×19 press sheet imposition | Migrated | Flask |
ud_rfqs_gdm | UD RFQ parsing + filtering | Migrated | Flask |
ud_rfq_normalizer | UD RFQ normalization | Migrated | Flask |
trading_card_sheet_normalizer | Multi-profile spreadsheet normalization | New | Flask + LLM |
trading_card_subject_extractor | Vision-based text extraction | New | Flask + Ollama |
card_art_rfq_matcher | RFQ-to-PDF page matching | New | Flask + Ollama |
imposition_builder_18_up | 18-up imposition packaging | New | Flask + Ollama |
pdf_splitter | PDF split planning | Migrated | Flask |
3.4 Strengths and Gaps
| Strength (Keep) | Gap (Add) |
|---|---|
| Pure functions, deterministic, testable | No persistent state between tool invocations |
| Frozen immutable dataclasses | No shared domain model across tools |
| Dumb orchestrators (zero branching) | No tool-to-tool composition |
| Thin adapters with separated I/O | No database persistence layer |
| Automated enforcement (40+ checks) | No audit trail or event history |
| Contract versioning in every tool | No multi-tenant isolation |
| Structured error handling (first-class outputs) | No workflow orchestration |
4. The DAG Product Model
4.1 Why DAGs, Not Trees
A tree requires every node to have exactly one parent. But real products violate this constantly. A lamination process applies to multiple layers. A UV coating covers multiple components. In a tree, you’d duplicate the lamination node under each layer — duplicating cost calculations and creating update anomalies.
A Directed Acyclic Graph allows shared nodes with multiple parents. The lamination node exists once, with edges from each layer that passes through it. No duplication. Correct cost rollup. One update to the lamination spec propagates everywhere.
4.2 Universal Node Schema
Every node in the product graph conforms to a single schema, regardless of whether it represents a layer, a process, or an assembly:
@dataclass(frozen=True)
class ConfigNode:
"""Universal product configuration node."""
id: str # UUID
type: str # 'layer', 'process', 'assembly', 'finishing'
version: int # Optimistic concurrency
attributes: dict # Open schema — type-specific properties
children: tuple[str, ...] # Child node IDs (directed edges down)
parents: tuple[str, ...] # Parent node IDs (directed edges up)
pricing_function_id: str | None # Which pricing function to invoke
tracking: TrackingState | None # Production tracking (optional)
@dataclass(frozen=True)
class TrackingState:
status: str # 'pending', 'in_progress', 'complete'
assigned_to: str | None
station: str | None
timestamps: dict # {'started': ..., 'completed': ...}
@dataclass(frozen=True)
class ProductConfig:
"""Complete product configuration — flat map of nodes."""
id: str # Product config UUID
root_node_id: str # Entry point for traversal
nodes: dict[str, ConfigNode] # {node_id: ConfigNode} — O(1) lookup
version: int
created_at: str
metadata: dict # Customer, job name, etc.
nodes dictionary stores every node by ID. Navigation follows children and parents references. This gives you O(1) node lookup, supports DAG (shared nodes with multiple parents), and avoids deeply nested JSON that hits PostgreSQL TOAST compression thresholds.
4.3 Node Type Registry
Instead of hardcoding node types or requiring schema migrations for new product capabilities, a type registry defines what each node type can contain:
@dataclass(frozen=True)
class NodeTypeDefinition:
type_name: str # 'layer', 'process', 'assembly'
attribute_schema: dict # JSON Schema for the attributes field
allowed_child_types: tuple[str, ...] # What can this node contain?
default_pricing_function: str | None # Default pricing strategy
display_name: str # Human-readable label
# Example registry entries
LAYER_TYPE = NodeTypeDefinition(
type_name="layer",
attribute_schema={
"required": ["substrate", "ink_colors", "sides"],
"properties": {
"substrate": {"type": "string"},
"ink_colors": {"type": "string"}, # e.g., "4/0 CMYK"
"sides": {"type": "integer", "enum": [1, 2]},
"weight": {"type": "string"}, # e.g., "14pt C2S"
}
},
allowed_child_types=("process", "finishing"),
default_pricing_function="layer_pricing_v1",
display_name="Print Layer",
)
PROCESS_TYPE = NodeTypeDefinition(
type_name="process",
attribute_schema={
"required": ["process_name"],
"properties": {
"process_name": {"type": "string"}, # 'uv_coating', 'lamination'
"coverage": {"type": "string"}, # 'full', 'spot'
"sides": {"type": "string"}, # 'front', 'both'
}
},
allowed_child_types=("process", "finishing"),
default_pricing_function="process_pricing_v1",
display_name="Process Step",
)
New product capabilities (embossing, die-cutting, foil stamping) are added by registering new type definitions — no code deployment, no database migration, no schema change.
4.4 Graph Operations (Pure Functions)
All graph manipulation lives in core/product_graph/application/ as pure functions:
# graph_ops.py — structural mutations (return new graph, never mutate)
def add_node(config: ProductConfig, node: ConfigNode) -> ProductConfig:
"""Add a node to the product configuration. Returns new config."""
...
def connect(config: ProductConfig, parent_id: str, child_id: str) -> ProductConfig:
"""Create a directed edge from parent to child. Returns new config."""
...
def remove_node(config: ProductConfig, node_id: str) -> ProductConfig:
"""Remove a node and all its edges. Returns new config."""
...
def validate_acyclic(config: ProductConfig) -> list[str]:
"""Detect cycles using three-color DFS. Returns list of errors (empty = valid)."""
...
# traversal.py — read-only graph analysis
def topological_sort(config: ProductConfig) -> list[str]:
"""Return node IDs in dependency order (children before parents)."""
...
def evaluate_bottom_up(config: ProductConfig, eval_fn) -> dict[str, Any]:
"""Post-order traversal with memoization for shared DAG nodes."""
...
def find_ancestors(config: ProductConfig, node_id: str) -> set[str]:
"""All nodes reachable by following parent edges."""
...
def find_descendants(config: ProductConfig, node_id: str) -> set[str]:
"""All nodes reachable by following child edges."""
...
ProductConfig instances (frozen dataclasses) rather than mutating in place. They live in core/, are tested without databases, and the orchestrator calls them in sequence with zero branching. Existing Soft Code discipline applies without modification.
5. Event Sourcing
5.1 Why Immutable Events
Traditional MIS systems use mutable status fields: UPDATE jobs SET status = 'printing' WHERE id = 123. The previous state is destroyed. You cannot answer questions like:
- How long did the job spend in prepress?
- Who moved it to the printing station?
- Was it ever sent back for rework?
- What was the original configuration before the customer changed it?
Event sourcing records every state change as an immutable, append-only event. The event stream is the source of truth. Current state is a derived projection — a fold over the event stream.
# Instead of: UPDATE jobs SET status = 'printing'
# You append:
JobMovedToStation(
job_id="job-123",
station="heidelberg-xl-106",
operator="maria.garcia",
timestamp="2026-03-01T14:23:00Z",
previous_station="prepress-2"
)
5.2 Event Anatomy
@dataclass(frozen=True)
class DomainEvent:
"""Base structure for all domain events."""
event_id: str # UUID — globally unique
stream_id: str # e.g., "job:abc-123" or "product:def-456"
version: int # Monotonic within stream (optimistic concurrency)
event_type: str # e.g., "JobCreated", "NodeAdded", "QuoteGenerated"
timestamp: str # ISO 8601
payload: dict # Event-specific data
metadata: dict # Actor, correlation_id, causation_id
# Concrete event examples:
@dataclass(frozen=True)
class JobCreated:
job_id: str
customer_id: str
product_config_id: str
source_tool: str # Which tool initiated this
source_files: tuple[str, ...]
@dataclass(frozen=True)
class NormalizationCompleted:
job_id: str
tool: str # "trading_card_sheet_normalizer"
row_count: int
warning_count: int
error_count: int
profiles_detected: tuple[str, ...]
@dataclass(frozen=True)
class QuoteGenerated:
job_id: str
product_config_id: str
total_price: str # Decimal as string for precision
line_items: tuple[dict, ...]
rate_table_versions: dict # Which rate tables were used
@dataclass(frozen=True)
class ProductionStageCompleted:
job_id: str
stage: str # "imposition", "printing", "cutting"
operator: str
station: str
waste_sheets: int
duration_seconds: int
5.3 Event Streams and Replay
Events are grouped into streams by aggregate root (typically a job or a product configuration). Each stream is an ordered sequence:
# Event stream for job "job-abc-123":
# v1: JobCreated {customer: "Acme Cards", product_config: "cfg-789"}
# v2: NormalizationCompleted {rows: 45, warnings: 2}
# v3: ArtMatchCompleted {matched: 43, unmatched: 2}
# v4: ImpositionBuilt {sheets: 3, layout: "18-up"}
# v5: QuoteGenerated {total: "1,234.50"}
# v6: QuoteApproved {approved_by: "john.smith"}
# v7: ProductionStarted {station: "prepress-1"}
# ...
# Current state = fold(events, initial_state, apply_fn)
def rebuild_job_state(events: list[DomainEvent]) -> JobState:
"""Replay events to reconstruct current state."""
state = JobState.empty()
for event in events:
state = apply_event(state, event)
return state
def apply_event(state: JobState, event: DomainEvent) -> JobState:
"""Pure function: apply one event to produce new state."""
match event.event_type:
case "JobCreated":
return state.with_status("created").with_customer(event.payload["customer_id"])
case "QuoteApproved":
return state.with_status("approved")
case "ProductionStarted":
return state.with_status("in_production").with_station(event.payload["station"])
case _:
return state # Unknown events are safely ignored
core/event_store/application/decision_logic.py and is tested with plain unit tests. The Soft Code discipline applies perfectly.
5.4 Projections (Materialized Read Models)
While event streams are the source of truth, querying them directly is expensive for operational views. Projections are pre-computed read models maintained by asynchronously processing new events:
| Projection | Source Events | Purpose |
|---|---|---|
| Job Board | JobCreated, StatusChanged, StageCompleted | Dashboard showing all jobs with current status |
| Station Queue | ProductionStarted, StageCompleted | What’s waiting at each machine/station |
| Customer History | JobCreated, QuoteGenerated, QuoteApproved | All jobs for a customer, quotes, approvals |
| Pricing Audit | QuoteGenerated | Which rate tables were used, when, for what |
| Waste Report | StageCompleted | Waste sheets per station, operator, time period |
Projections are disposable and rebuildable. If a projection is corrupted or you change its schema, delete it and replay all events. The event stream is never lost.
6. CQRS Pattern
6.1 Command Side (Writes)
Commands represent intentions: “create this job,” “approve this quote,” “move to next station.” The command handler validates, executes business logic (pure functions from core/), and emits events:
# Command flow:
# 1. Receive command (adapter layer — Flask route or API)
# 2. Load current state (replay events for this stream)
# 3. Validate command against current state (core decision function)
# 4. Execute business logic (core decision function)
# 5. Emit new events (adapter persists to event store)
def handle_approve_quote(cmd: ApproveQuoteCommand, events: list[DomainEvent]) -> list[DomainEvent]:
"""Pure function: given current events + command, return new events."""
state = rebuild_job_state(events)
errors = validate_approval(state, cmd) # Can this quote be approved?
if errors:
return [CommandRejected(errors=errors)]
return [QuoteApproved(
job_id=cmd.job_id,
approved_by=cmd.approver,
approved_price=state.quoted_price,
)]
6.2 Query Side (Reads)
Queries never touch the event store. They read from denormalized projection tables optimized for specific views:
# Query side — reads from projections, never from event store
# Each projection table is optimized for its use case:
# Job board: SELECT * FROM job_board WHERE status = 'in_production' ORDER BY priority
# Station: SELECT * FROM station_queue WHERE station = 'heidelberg-xl-106'
# Customer: SELECT * FROM customer_history WHERE customer_id = 'acme-cards'
# Agent: SELECT * FROM job_board WHERE next_action IS NOT NULL
6.3 Eventual Consistency
Projections are updated asynchronously as events arrive. There is a small delay (typically 1–5 seconds) between an event being written and the projection reflecting it.
7. Pricing Engine
7.1 The Three-Phase Pattern: Pre-load, Evaluate, Aggregate
Legacy MIS systems price composable products by querying the database for each variable: substrate cost, ink cost, finishing cost, shipping cost — one query per variable per component. A complex card with 20 configuration nodes triggers 50–200+ database round trips.
The DAG pricing engine inverts this:
| Phase | I/O? | Operations | Queries |
|---|---|---|---|
| 1. Pre-load | Yes (adapter) | Load product config + all relevant rate tables | 1–3 total |
| 2. Evaluate | No (core) | Pure arithmetic on cached data, traverse DAG bottom-up | 0 |
| 3. Aggregate | No (core) | Parent nodes roll up children results | 0 |
This transforms quoting from I/O-bound (seconds) to CPU-bound (milliseconds).
7.2 Bottom-Up DAG Traversal with Memoization
def evaluate_pricing(config: ProductConfig, rate_context: RateContext) -> PricingResult:
"""Bottom-up (post-order) DAG traversal with memoization."""
memo: dict[str, NodePrice] = {}
order = topological_sort(config) # Children before parents
for node_id in order:
node = config.nodes[node_id]
if node_id in memo:
continue # Already evaluated (DAG shared node)
# Evaluate children first (guaranteed by topological order)
child_prices = [memo[cid] for cid in node.children]
# Look up pricing function from registry
pricing_fn = get_pricing_function(node.pricing_function_id)
# Pure computation — no database access
node_price = pricing_fn(
node=node,
children=child_prices,
rates=rate_context,
)
memo[node_id] = node_price
return PricingResult(
total=memo[config.root_node_id].total,
breakdown=memo,
)
memo cache ensures each node is computed exactly once, regardless of how many paths lead to it.
7.3 Three-Layer Caching Strategy
| Cache Layer | Scope | TTL / Invalidation | What It Caches |
|---|---|---|---|
| L1: Rate Tables | Application-wide | 1-hour TTL + explicit invalidation on update | Substrate prices, ink costs, finishing rates |
| L2: Subtree Pricing | Per-product | Hash of config document — invalidated on any mutation | Previously computed subtree results |
| L3: Node Memoization | Per-request | In-memory, discarded after request | Prevents redundant DAG node evaluation |
8. Storage: Supabase Postgres + JSONB
8.1 Why This Stack
The article recommends a hybrid storage strategy: JSONB documents for hierarchical product configurations, relational tables for stable reference data. Supabase gives you exactly this as a managed service:
- PostgreSQL — battle-tested, ACID-compliant, with JSONB, GIN indexes, and array types
- Managed operations — no ops burden while building; focus on architecture, not infrastructure
- Row Level Security (RLS) — built-in multi-tenant isolation at the database level
- Realtime subscriptions — push UI updates when projections change (with caveats)
- Edge Functions — lightweight server-side logic for event processing
- Migration path — standard Postgres; can move to self-hosted if you outgrow Supabase
8.2 Hybrid Storage Strategy
| Data Type | Storage | Why |
|---|---|---|
| Product configurations (DAG nodes) | Relational rows + JSONB attributes | One row per node, JSONB for flexible properties |
| Event streams | Append-only table + JSONB payload | Immutable events, indexed by stream + version |
| Rate tables | Relational columns | Stable schema, frequently queried, indexed |
| Customers, operators | Relational columns | Standard entity tables, RLS policies |
| Projections | Denormalized tables + JSONB summary | Disposable read models, trigger-maintained |
8.3 Complete Schema Design
-- ═══════════════════════════════════════════════════════════════
-- PRODUCT CONFIGURATION (DAG)
-- ═══════════════════════════════════════════════════════════════
-- One row per node, NOT one giant document per product.
-- Each node's JSONB attributes stays small (well under TOAST threshold).
CREATE TABLE config_nodes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL, -- Which product this node belongs to
type TEXT NOT NULL, -- 'layer', 'process', 'assembly', 'finishing'
version INT NOT NULL DEFAULT 1, -- Optimistic concurrency
attributes JSONB NOT NULL DEFAULT '{}', -- Type-specific properties (open schema)
children UUID[] DEFAULT '{}', -- Child node IDs (directed edges)
parents UUID[] DEFAULT '{}', -- Parent node IDs (directed edges)
pricing_fn TEXT, -- Pricing function identifier
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- Product-level aggregation
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES customers(id),
name TEXT NOT NULL,
root_node_id UUID REFERENCES config_nodes(id),
version INT NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'draft', -- 'draft', 'quoted', 'approved', 'in_production'
metadata JSONB DEFAULT '{}', -- Job name, PO number, etc.
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- Indexes
CREATE INDEX idx_nodes_product ON config_nodes(product_id);
CREATE INDEX idx_nodes_type ON config_nodes(type);
CREATE INDEX idx_nodes_attrs ON config_nodes USING GIN(attributes);
CREATE INDEX idx_products_customer ON products(customer_id);
CREATE INDEX idx_products_status ON products(status);
-- ═══════════════════════════════════════════════════════════════
-- NODE TYPE REGISTRY
-- ═══════════════════════════════════════════════════════════════
CREATE TABLE node_types (
type_name TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
attribute_schema JSONB NOT NULL, -- JSON Schema for attributes validation
allowed_children TEXT[] DEFAULT '{}', -- Allowed child type_names
default_pricing TEXT, -- Default pricing function ID
created_at TIMESTAMPTZ DEFAULT now()
);
-- ═══════════════════════════════════════════════════════════════
-- IMMUTABLE EVENT LOG
-- ═══════════════════════════════════════════════════════════════
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id TEXT NOT NULL, -- e.g., 'job:abc-123', 'product:def-456'
version INT NOT NULL, -- Monotonic within stream
event_type TEXT NOT NULL, -- e.g., 'JobCreated', 'QuoteGenerated'
payload JSONB NOT NULL, -- Event-specific data
metadata JSONB DEFAULT '{}', -- Actor, correlation_id, causation_id
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(stream_id, version) -- Optimistic concurrency control
);
CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_created ON events(created_at);
-- Prevent mutation: events are append-only
CREATE OR REPLACE FUNCTION prevent_event_mutation()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Events are immutable. Cannot update or delete.';
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER events_immutable
BEFORE UPDATE OR DELETE ON events
FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();
-- ═══════════════════════════════════════════════════════════════
-- RATE TABLES (Relational — Stable Schema)
-- ═══════════════════════════════════════════════════════════════
CREATE TABLE rate_tables (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL, -- 'substrate_costs', 'ink_rates', 'finishing_rates'
category TEXT NOT NULL, -- 'material', 'labor', 'equipment', 'finishing'
rates JSONB NOT NULL, -- Structured pricing data
quantity_breaks JSONB, -- Quantity-based pricing tiers
valid_from TIMESTAMPTZ NOT NULL,
valid_until TIMESTAMPTZ, -- NULL = currently active
version INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_rates_category ON rate_tables(category);
CREATE INDEX idx_rates_valid ON rate_tables(valid_from, valid_until);
-- ═══════════════════════════════════════════════════════════════
-- PROJECTIONS (Denormalized Read Models — Disposable & Rebuildable)
-- ═══════════════════════════════════════════════════════════════
-- Job board projection
CREATE TABLE proj_job_board (
job_id TEXT PRIMARY KEY,
customer_name TEXT,
product_name TEXT,
status TEXT NOT NULL,
current_station TEXT,
assigned_to TEXT,
quoted_price NUMERIC(12,2),
priority INT DEFAULT 0,
summary JSONB DEFAULT '{}', -- Aggregated stats
last_event_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT now()
);
-- Station queue projection
CREATE TABLE proj_station_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
station TEXT NOT NULL,
job_id TEXT NOT NULL,
stage TEXT NOT NULL,
priority INT DEFAULT 0,
queued_at TIMESTAMPTZ DEFAULT now(),
estimated_duration_min INT
);
CREATE INDEX idx_station_queue ON proj_station_queue(station, priority);
-- Customer history projection
CREATE TABLE proj_customer_history (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
job_id TEXT NOT NULL,
event_type TEXT NOT NULL,
summary JSONB,
occurred_at TIMESTAMPTZ
);
CREATE INDEX idx_customer_history ON proj_customer_history(customer_id, occurred_at);
-- ═══════════════════════════════════════════════════════════════
-- REFERENCE TABLES
-- ═══════════════════════════════════════════════════════════════
CREATE TABLE customers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
email TEXT,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE operators (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
station TEXT,
role TEXT DEFAULT 'operator',
active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT now()
);
-- ═══════════════════════════════════════════════════════════════
-- ROW LEVEL SECURITY (Multi-Tenant Foundation)
-- ═══════════════════════════════════════════════════════════════
ALTER TABLE products ENABLE ROW LEVEL SECURITY;
ALTER TABLE config_nodes ENABLE ROW LEVEL SECURITY;
ALTER TABLE events ENABLE ROW LEVEL SECURITY;
-- Example policy: users see only their organization's products
-- (Uncomment and customize when multi-tenant is needed)
-- CREATE POLICY products_tenant_isolation ON products
-- USING (metadata->>'tenant_id' = current_setting('app.current_tenant'));
8.4 TOAST and JSONB Performance
PostgreSQL applies TOAST (The Oversized-Attribute Storage Technique) compression to values exceeding ~2KB. TOAST-compressed values are stored in a separate table, requiring additional disk seeks on read.
The solution: one row per node. The schema above stores each ConfigNode as a separate row. The attributes JSONB per node is small (100–500 bytes typically). The flat-map is reconstructed in Python:
# Adapter layer: load product config from Supabase
def load_product_config(product_id: str) -> ProductConfig:
"""Load all nodes for a product and assemble into ProductConfig."""
product = supabase.table("products").select("*").eq("id", product_id).single().execute()
nodes = supabase.table("config_nodes").select("*").eq("product_id", product_id).execute()
# Build flat-map in Python (core pure function)
node_map = {row["id"]: to_config_node(row) for row in nodes.data}
return ProductConfig(
id=product["id"],
root_node_id=product["root_node_id"],
nodes=node_map,
version=product["version"],
created_at=product["created_at"],
metadata=product["metadata"],
)
# Total queries: 2 (product + nodes). Regardless of graph complexity.
8.5 Row Level Security for Multi-Tenant
When you need customer isolation (customer A cannot see customer B’s products), Supabase’s Row Level Security adds a WHERE clause to every query automatically:
-- Every query on products automatically gets:
-- WHERE metadata->>'tenant_id' = auth.jwt()->>'tenant_id'
-- No code changes needed — the database enforces isolation.
This is critical for an enterprise MIS. You add multi-tenant isolation at the database level, not in application code. Every query, every projection, every event stream is automatically scoped.
8.6 Realtime and Edge Functions: Caveats
Supabase Realtime
- Processes database changes on a single thread to maintain order
- Checks RLS per subscriber (100 subscribers × 1 insert = 100 RLS checks)
- Use for: UI notifications (“job status changed”), live dashboards
- Do NOT use for: Projection rebuilds, data consistency, event processing
Supabase Edge Functions
- Lightweight server-side handlers (Deno runtime)
- Good for: Webhook receivers, simple event processing, authentication
- Not for: Heavy computation (batch repricing, bulk imports, PDF generation)
- Your Flask adapters handle heavy lifting; Edge Functions complement, not replace
The Right Pattern for Projections
-- Use PostgreSQL triggers to maintain projections server-side.
-- This runs in the database, not in Edge Functions or Realtime.
CREATE OR REPLACE FUNCTION update_job_board_projection()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO proj_job_board (job_id, status, last_event_at, updated_at)
VALUES (
NEW.stream_id,
NEW.payload->>'status',
NEW.created_at,
now()
)
ON CONFLICT (job_id)
DO UPDATE SET
status = COALESCE(NEW.payload->>'status', proj_job_board.status),
last_event_at = NEW.created_at,
updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_job_board
AFTER INSERT ON events
FOR EACH ROW
WHEN (NEW.event_type IN ('JobCreated', 'StatusChanged', 'StageCompleted'))
EXECUTE FUNCTION update_job_board_projection();
8.7 Migration Path
Supabase is standard PostgreSQL. If you outgrow the managed service:
- pg_dump your entire database
- pg_restore to any Postgres instance (AWS RDS, DigitalOcean, self-hosted)
- Update connection strings in your adapter layer
- Core never changes — it doesn’t know Supabase exists
9. Integration Architecture: DAG ES + Tool Hub
9.1 Layer Mapping
9.2 Tool Composition via Workflow DAG
Today, your tools are independent. With a workflow DAG, they compose into production pipelines:
Each node in this workflow DAG is a tool invocation. Edges define data flow. The core/job_workflow/ tool validates the DAG, determines execution order, and tracks completion. This is the foundation for agentic behavior — an AI agent reads this graph, checks event state, and decides what to run next.
9.3 Contract Evolution
Existing tool contracts (ToolInput / ToolOutput) gain optional event metadata:
# Enhanced ToolOutput — backward compatible
@dataclass(frozen=True)
class ToolOutput:
success: bool
result: dict
errors: list[str]
warnings: list[str]
contracts_version: str
# NEW — optional, backward-compatible additions:
events: tuple[dict, ...] = () # Domain events emitted by this run
job_id: str | None = None # If this run is part of a tracked job
correlation_id: str | None = None # For tracing across tool invocations
Existing tools continue working with events=(). New tools or enhanced versions emit events. The adapter layer decides whether to persist them.
9.4 Adapter Extensions
The new adapters/supabase/ directory follows Soft Code conventions:
adapters/supabase/
├── connection.py # Supabase client initialization
├── graph_persistence.py # Load/save ProductConfig ↔ config_nodes table
├── event_persistence.py # Append events, load streams, optimistic concurrency
├── rate_loader.py # Batch-load rate tables into RateContext
├── projection_reader.py # Read from proj_* tables
├── RUNTIME_DEPENDENCIES.md # supabase-py, python-dotenv
└── tests/
└── test_persistence.py # Integration tests (requires Supabase connection)
Flask adapters remain unchanged. They gain an optional integration point: after calling run_contract(), they can pass events to the Supabase adapter for persistence. But this is additive — existing tools work without it.
10. Phased Roadmap
Following the Strangler Fig migration pattern from the article: build incrementally, validate at each stage, never attempt big-bang migration.
Phase 0: Foundation (Build First)
Goal: Create the structural primitives. No database yet. Pure functions only.
| New Core Tool | Purpose | Deliverables |
|---|---|---|
core/product_graph/ |
ConfigNode DAG — add, remove, connect, validate, traverse | DECISIONS.md, models.py, graph_ops.py, traversal.py, tests |
core/event_store/ |
Immutable event log — append, replay, apply, project | DECISIONS.md, models.py, event_log.py, tests |
core/pricing_engine/ |
Bottom-up DAG evaluation with pre-loaded rate context | DECISIONS.md, models.py, evaluator.py, tests |
Key constraint: Each tool follows the existing 6-step workflow. All pass preflight.py. Zero I/O in core. No database dependency.
Phase 1: Connect (Persist & Compose)
Goal: Connect foundation to Supabase. Existing tools start emitting events.
- Create
adapters/supabase/persistence layer - Deploy schema to Supabase (tables, indexes, triggers, RLS)
- Build Flask adapter for product graph visualization/editing
- Enhance
trading_card_sheet_normalizeroutput to create ConfigNodes - Enhance
order_quoteto use pricing engine’s DAG traversal - Run in shadow mode: new engine parallel to existing, validate accuracy
Phase 2: Track (Event Sourcing for Production)
Goal: Full production lifecycle tracking with audit trails.
- Define event types for each production stage
- Build projection tables and triggers (job board, station queues)
- Create operator-facing dashboards (station queue views)
- Jobs emit events:
JobCreated,NormalizationCompleted,SheetImposed, etc. - Build
core/job_workflow/tool for workflow DAG definition and sequencing - Historical reporting from event streams (waste analysis, throughput, SLA tracking)
Phase 3: Agentic Orchestration
Goal: AI agent that navigates workflow DAGs and decides next actions.
- CQRS command bus: tool invocations become validated commands
- Denormalized query projections for agent consumption
- Agent reads workflow DAG + event state + projections
- Agent decides: “Job X is normalized, art is matched, quote approved → build imposition next”
- Human-in-the-loop gates: agent proposes, human approves critical decisions
- Agent learns from event history: which workflows succeed, common failure points
11. Trade-offs and Risks
Advantages
| Benefit | Impact |
|---|---|
| Flexible product schemas | New product types without code deployment or schema migration |
| Millisecond quoting | 1–3 queries total, CPU-bound arithmetic instead of I/O-bound lookups |
| Complete audit trails | Every decision, every state change, every actor recorded permanently |
| Tool composability | Workflow DAGs define how tools chain together |
| Replay and debugging | Reproduce any historical state by replaying events to that point |
| Multi-tenant ready | RLS at database level, not application code |
| Agentic foundation | Event state + workflow DAGs are perfectly structured for AI agent consumption |
Costs and Risks
| Cost | Mitigation |
|---|---|
| Event schema evolution — once emitted, changing event shapes requires migration | Version events from day one. Use upcasting (transform old events to new schema on read). Keep payloads backward-compatible. |
| Operational complexity — event consumers, projections, eventual consistency | Start with Postgres triggers (simple, in-database). Add external consumers only when needed. |
| Debugging surface area — tracing event streams instead of call stacks | Correlation IDs on every event. Build an event explorer tool early. |
| Eventual consistency — projections may lag 1–5 seconds behind events | Acceptable for operational views. Only the event stream must be immediately consistent. |
| Database dependency — moving from stateless to stateful | Supabase is managed. Core remains pure and testable without database. Only adapters touch DB. |
| Specialized talent — event sourcing requires different mental model | This document. Start simple. Build incrementally. Phase 0 requires no event sourcing knowledge. |
| Ad-hoc queries harder — event stores are less ergonomic than flat tables for analytics | Projections are denormalized tables — query them like normal SQL. Event store is for writes + replay. |
12. What Changes, What Stays
What Does NOT Change
core/stays pure — no I/O, no frameworks, frozen dataclasses- Orchestrators stay dumb — zero branching, recipe cards
- Adapters stay thin — transport and I/O only
- Existing 12 tools keep working — DAG and events are additive layers
- The 6-step workflow stays — every new tool follows DECISIONS → shapes → functions → orchestrator → adapter → verification
- Preflight checks still run — add new checks, don’t replace
tools_registry.jsonstructure stays
What DOES Change
- New shared domain models:
ConfigNode,DomainEvent,ProductConfigbecome cross-tool types - New infrastructure core tools:
product_graph,event_store,pricing_engineare tools that other tools depend on - New adapter type:
adapters/supabase/for database persistence - New composition layer:
core/job_workflow/for tool-to-tool DAGs - Registry evolution: Add
depends_on,emits_events,supports_graphtotools_registry.json - ToolOutput evolution: Optional
events,job_id,correlation_idfields
13. Reference: Complete Data Models
Product Graph Domain
# core/product_graph/domain/models.py
from dataclasses import dataclass
@dataclass(frozen=True)
class ConfigNode:
id: str
type: str
version: int
attributes: dict
children: tuple[str, ...]
parents: tuple[str, ...]
pricing_function_id: str | None = None
tracking: "TrackingState | None" = None
@dataclass(frozen=True)
class TrackingState:
status: str
assigned_to: str | None = None
station: str | None = None
timestamps: dict = None
@dataclass(frozen=True)
class ProductConfig:
id: str
root_node_id: str
nodes: dict # {str: ConfigNode} — use dict for frozen compatibility
version: int
created_at: str
metadata: dict
@dataclass(frozen=True)
class NodeTypeDefinition:
type_name: str
display_name: str
attribute_schema: dict
allowed_child_types: tuple[str, ...]
default_pricing_function: str | None = None
Event Store Domain
# core/event_store/domain/models.py
from dataclasses import dataclass
@dataclass(frozen=True)
class DomainEvent:
event_id: str
stream_id: str
version: int
event_type: str
timestamp: str
payload: dict
metadata: dict
@dataclass(frozen=True)
class EventStream:
stream_id: str
events: tuple # tuple[DomainEvent, ...]
current_version: int
@dataclass(frozen=True)
class Snapshot:
stream_id: str
version: int
state: dict
created_at: str
@dataclass(frozen=True)
class AppendResult:
success: bool
errors: tuple[str, ...]
new_version: int
Pricing Engine Domain
# core/pricing_engine/domain/models.py
from dataclasses import dataclass
@dataclass(frozen=True)
class RateContext:
"""Pre-loaded rate data for a pricing run. No DB access during evaluation."""
substrate_rates: dict
ink_rates: dict
finishing_rates: dict
labor_rates: dict
quantity_breaks: dict
markup_rules: dict
loaded_at: str
rate_table_versions: dict # {table_name: version} for audit
@dataclass(frozen=True)
class NodePrice:
node_id: str
node_type: str
base_cost: str # Decimal as string
quantity_adjusted: str
markup: str
total: str
breakdown: dict # Line-item details
children_total: str # Rolled-up children cost
@dataclass(frozen=True)
class PricingResult:
success: bool
errors: tuple[str, ...]
total: str
breakdown: dict # {node_id: NodePrice}
rate_context_versions: dict # Which rate tables were used
14. Reference: Complete SQL Schema
The full schema from Section 8.3 is deployment-ready. Key design decisions:
| Decision | Rationale |
|---|---|
| One row per ConfigNode, not one document per product | Avoids TOAST compression, enables per-node indexing and RLS |
UNIQUE(stream_id, version) on events | Optimistic concurrency — concurrent writers get a unique violation, must retry |
| Trigger-based immutability on events | Database enforces append-only semantics, not application code |
JSONB attributes on nodes, columns for type | Hybrid: index/query the stable fields, JSONB for the variable parts |
| Projection tables are separate from events | Disposable and rebuildable; optimized for read patterns |
UUID[] for children/parents | PostgreSQL native arrays; enables ANY() queries and GIN indexing |
Rate tables with valid_from/valid_until | Temporal validity — reproduce historical quotes with the rates active at that time |
15. Glossary
| Term | Definition |
|---|---|
| DAG | Directed Acyclic Graph. Nodes connected by directed edges with no cycles. Products are modeled as DAGs where nodes are components and edges are dependencies. |
| ConfigNode | A single node in the product configuration graph. Represents a layer, process, assembly, or finishing step. |
| ProductConfig | A complete product configuration: a flat map of ConfigNodes with a designated root node. |
| Event Sourcing | Architecture pattern where state changes are recorded as immutable events. Current state is derived by replaying the event stream. |
| DomainEvent | An immutable record of something that happened. Contains a type, payload, timestamp, and metadata. |
| Event Stream | An ordered sequence of events for a single aggregate (e.g., all events for job “abc-123”). |
| Projection | A denormalized read model derived from events. Optimized for specific query patterns. Disposable and rebuildable. |
| CQRS | Command Query Responsibility Segregation. Separate write model (commands → events) from read model (projections). |
| Topological Sort | Ordering of DAG nodes such that every parent appears after its children. Used for bottom-up pricing evaluation. |
| Memoization | Caching node evaluation results to prevent duplicate computation of shared DAG nodes (diamond dependencies). |
| TOAST | PostgreSQL’s Oversized-Attribute Storage Technique. Compresses values >2KB into a separate table. Avoid by keeping JSONB values small. |
| RLS | Row Level Security. PostgreSQL feature that automatically filters rows based on the current user’s identity. Foundation for multi-tenant isolation. |
| Strangler Fig | Migration pattern where new system runs parallel to legacy, gradually taking over functionality. Never big-bang. |
| Soft Code | Architecture discipline in tool-hub-try5: decisions separated from plumbing, enforced by file structure and automated checks. |
| Optimistic Concurrency | Conflict detection via version numbers. If two writers try to append version 5 to the same stream, one gets a unique constraint violation and must retry. |
| Rate Context | Pre-loaded bundle of all rate tables needed for a pricing run. Eliminates database access during evaluation. |
DAG Event Sourcing × Tool Hub Deep Dive · March 2026
Architecture study for tool-hub-try5 → Enterprise ERP MIS
Storage: Supabase Postgres + JSONB