DAG Event Sourcing × Tool Hub
Enterprise ERP MIS Deep Dive

March 2026 · Architecture study for tool-hub-try5 → Enterprise MIS evolution

DAG Product Model Event Sourcing + CQRS Supabase Postgres Agentic Roadmap

1. Executive Summary

This document is the complete architectural reference for evolving tool-hub-try5 — a repository of 12 stateless, pure-function tools built with Soft Code discipline — into an enterprise-grade ERP MIS capable of modeling composable products, millisecond quoting, full production tracking, and eventually agentic AI orchestration.

The transformation strategy combines three architectural patterns:

  1. DAG Product Model — Products are directed acyclic graphs of typed nodes, not flat database rows. A trading card order with UV coating, security foil, and multiple layers becomes a graph of interconnected configuration nodes.
  2. Event Sourcing — Every state change is recorded as an immutable event. Instead of overwriting a job’s status field, you append JobCreated, NormalizationCompleted, SheetImposed events. Current state is derived by replaying the event stream.
  3. Supabase Postgres + JSONB — Managed PostgreSQL with JSONB for flexible product configuration storage, relational tables for stable schemas (rate tables, customers), and Row Level Security for multi-tenant isolation.
Key Insight The existing Soft Code architecture — pure functions in core/, I/O in adapters/, dumb orchestrators, frozen dataclasses — is already aligned with event sourcing. Decisions are pure functions. Domain models are immutable. Orchestrators are sequence-only. The DAG and event layers are additive, not a rewrite.

2. The Problem Statement

What We Have

Twelve independent, stateless tools. Each takes input, produces output, forgets everything. They normalize spreadsheets, match art to RFQs, build imposition layouts, split PDFs, generate labels, and calculate quotes. Each is rigorously structured: frozen dataclasses, pure decision functions, zero-branching orchestrators, thin Flask adapters.

What an Enterprise MIS Needs

CapabilityCurrent StateRequired State
Product modelingEach tool has its own flat data shapesUnified composable product graph (DAG)
QuotingSingle-shot order_quote toolMillisecond DAG-traversal pricing engine
State persistenceNone — stateless request/responseFull job lifecycle with audit trail
Tool compositionIndependent tools, manual chainingDAG-defined workflows: tool A → tool B → tool C
Production trackingNoneEvent-sourced station tracking, operator assignment
Agentic orchestrationNoneAI agent navigating workflow DAGs, deciding next actions
Multi-tenantSingle-user Flask appsRow-level security, customer isolation

The Core Gap

The gap is not in computation — your pure functions are excellent. The gap is in persistent structure: a shared domain model that connects tools, tracks state over time, and enables composition. DAG event sourcing provides that structure without compromising the existing architecture.

3. Current Architecture: Tool Hub Try5

3.1 The Soft Code Discipline

Every function in this codebase must either make a decision (pure function: data in → data out) or perform plumbing (I/O: read files, write to network). Never both. This rule is enforced by 40+ automated validation scripts.

Why This Matters for Event Sourcing Event sourcing requires that state transitions are deterministic pure functions. Your decision functions already are. Event sourcing requires that side effects (persistence, notifications) happen at the edges. Your adapters already do this. The architecture is pre-adapted.

3.2 Canonical Tool Structure

core/<tool_name>/
├── DECISIONS.md                    # Business decisions (what, not how)
├── contracts.py                    # ToolInput, ToolOutput, run()
├── domain/
│   ├── models.py                   # Frozen dataclasses — plain data only
│   ├── specs.py                    # Constants, CONTRACTS_VERSION
│   └── schema_checklist.md         # Field types, defaults, error contract
├── examples/
│   ├── happy_path.json             # Valid input
│   └── invalid_path.json           # Error case
├── application/
│   ├── input_validation.py         # Payload → domain objects
│   ├── decision_logic.py           # Pure functions — the intellectual core
│   ├── output_mapping.py           # Domain objects → result dict
│   └── orchestrator.py             # Dumb sequencer — max 60 lines, ZERO branching
└── tests/
    └── test_decision_logic.py      # Pure function tests, no I/O

3.3 What Exists Today (12 Tools)

ToolPurposeOriginAdapter
sample_toolMinimal reference exampleNew
order_quotePricing calculationNew
label_generatorAvery 5160 label layoutsMigrated
imposition_tool13×19 press sheet impositionMigratedFlask
ud_rfqs_gdmUD RFQ parsing + filteringMigratedFlask
ud_rfq_normalizerUD RFQ normalizationMigratedFlask
trading_card_sheet_normalizerMulti-profile spreadsheet normalizationNewFlask + LLM
trading_card_subject_extractorVision-based text extractionNewFlask + Ollama
card_art_rfq_matcherRFQ-to-PDF page matchingNewFlask + Ollama
imposition_builder_18_up18-up imposition packagingNewFlask + Ollama
pdf_splitterPDF split planningMigratedFlask

3.4 Strengths and Gaps

Strength (Keep)Gap (Add)
Pure functions, deterministic, testableNo persistent state between tool invocations
Frozen immutable dataclassesNo shared domain model across tools
Dumb orchestrators (zero branching)No tool-to-tool composition
Thin adapters with separated I/ONo database persistence layer
Automated enforcement (40+ checks)No audit trail or event history
Contract versioning in every toolNo multi-tenant isolation
Structured error handling (first-class outputs)No workflow orchestration

4. The DAG Product Model

4.1 Why DAGs, Not Trees

A tree requires every node to have exactly one parent. But real products violate this constantly. A lamination process applies to multiple layers. A UV coating covers multiple components. In a tree, you’d duplicate the lamination node under each layer — duplicating cost calculations and creating update anomalies.

A Directed Acyclic Graph allows shared nodes with multiple parents. The lamination node exists once, with edges from each layer that passes through it. No duplication. Correct cost rollup. One update to the lamination spec propagates everywhere.

Trading Card Product DAG Example ┌─────────────┐ │ Card Order │ (root assembly) │ qty: 500 │ └──────┬──────┘ ┌──────────┼──────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Front │ │ Back │ │ Insert │ (layer nodes) │ 4/0 CMYK │ │ 4/0 CMYK │ │ 1/0 Black│ │ 14pt C2S │ │ 14pt C2S │ │ 80# text │ └─────┬────┘ └─────┬────┘ └────┬─────┘ │ │ │ ▼ ▼ │ ┌───────────────────────┐ │ │ UV Coating │ │ (shared process — DAG!) │ spot gloss, front │ │ └───────────┬───────────┘ │ │ │ ▼ ▼ ┌───────────────────────────┐ │ Lamination │ (shared process — DAG!) │ matte, both sides │ └───────────┬───────────────┘ │ ▼ ┌───────────┐ │ Security │ │ Foil │ (finishing process) │ hologram │ └───────────┘

4.2 Universal Node Schema

Every node in the product graph conforms to a single schema, regardless of whether it represents a layer, a process, or an assembly:

@dataclass(frozen=True)
class ConfigNode:
    """Universal product configuration node."""
    id: str                              # UUID
    type: str                            # 'layer', 'process', 'assembly', 'finishing'
    version: int                         # Optimistic concurrency
    attributes: dict                     # Open schema — type-specific properties
    children: tuple[str, ...]            # Child node IDs (directed edges down)
    parents: tuple[str, ...]             # Parent node IDs (directed edges up)
    pricing_function_id: str | None      # Which pricing function to invoke
    tracking: TrackingState | None       # Production tracking (optional)

@dataclass(frozen=True)
class TrackingState:
    status: str                          # 'pending', 'in_progress', 'complete'
    assigned_to: str | None
    station: str | None
    timestamps: dict                     # {'started': ..., 'completed': ...}

@dataclass(frozen=True)
class ProductConfig:
    """Complete product configuration — flat map of nodes."""
    id: str                              # Product config UUID
    root_node_id: str                    # Entry point for traversal
    nodes: dict[str, ConfigNode]         # {node_id: ConfigNode} — O(1) lookup
    version: int
    created_at: str
    metadata: dict                       # Customer, job name, etc.
Why a Flat Map, Not Nested Objects The nodes dictionary stores every node by ID. Navigation follows children and parents references. This gives you O(1) node lookup, supports DAG (shared nodes with multiple parents), and avoids deeply nested JSON that hits PostgreSQL TOAST compression thresholds.

4.3 Node Type Registry

Instead of hardcoding node types or requiring schema migrations for new product capabilities, a type registry defines what each node type can contain:

@dataclass(frozen=True)
class NodeTypeDefinition:
    type_name: str                       # 'layer', 'process', 'assembly'
    attribute_schema: dict               # JSON Schema for the attributes field
    allowed_child_types: tuple[str, ...] # What can this node contain?
    default_pricing_function: str | None # Default pricing strategy
    display_name: str                    # Human-readable label

# Example registry entries
LAYER_TYPE = NodeTypeDefinition(
    type_name="layer",
    attribute_schema={
        "required": ["substrate", "ink_colors", "sides"],
        "properties": {
            "substrate": {"type": "string"},
            "ink_colors": {"type": "string"},    # e.g., "4/0 CMYK"
            "sides": {"type": "integer", "enum": [1, 2]},
            "weight": {"type": "string"},         # e.g., "14pt C2S"
        }
    },
    allowed_child_types=("process", "finishing"),
    default_pricing_function="layer_pricing_v1",
    display_name="Print Layer",
)

PROCESS_TYPE = NodeTypeDefinition(
    type_name="process",
    attribute_schema={
        "required": ["process_name"],
        "properties": {
            "process_name": {"type": "string"},   # 'uv_coating', 'lamination'
            "coverage": {"type": "string"},        # 'full', 'spot'
            "sides": {"type": "string"},           # 'front', 'both'
        }
    },
    allowed_child_types=("process", "finishing"),
    default_pricing_function="process_pricing_v1",
    display_name="Process Step",
)

New product capabilities (embossing, die-cutting, foil stamping) are added by registering new type definitions — no code deployment, no database migration, no schema change.

4.4 Graph Operations (Pure Functions)

All graph manipulation lives in core/product_graph/application/ as pure functions:

# graph_ops.py — structural mutations (return new graph, never mutate)

def add_node(config: ProductConfig, node: ConfigNode) -> ProductConfig:
    """Add a node to the product configuration. Returns new config."""
    ...

def connect(config: ProductConfig, parent_id: str, child_id: str) -> ProductConfig:
    """Create a directed edge from parent to child. Returns new config."""
    ...

def remove_node(config: ProductConfig, node_id: str) -> ProductConfig:
    """Remove a node and all its edges. Returns new config."""
    ...

def validate_acyclic(config: ProductConfig) -> list[str]:
    """Detect cycles using three-color DFS. Returns list of errors (empty = valid)."""
    ...
# traversal.py — read-only graph analysis

def topological_sort(config: ProductConfig) -> list[str]:
    """Return node IDs in dependency order (children before parents)."""
    ...

def evaluate_bottom_up(config: ProductConfig, eval_fn) -> dict[str, Any]:
    """Post-order traversal with memoization for shared DAG nodes."""
    ...

def find_ancestors(config: ProductConfig, node_id: str) -> set[str]:
    """All nodes reachable by following parent edges."""
    ...

def find_descendants(config: ProductConfig, node_id: str) -> set[str]:
    """All nodes reachable by following child edges."""
    ...
Alignment with Soft Code Every function above is pure: takes data in, returns data out, no I/O. They produce new ProductConfig instances (frozen dataclasses) rather than mutating in place. They live in core/, are tested without databases, and the orchestrator calls them in sequence with zero branching. Existing Soft Code discipline applies without modification.

5. Event Sourcing

5.1 Why Immutable Events

Traditional MIS systems use mutable status fields: UPDATE jobs SET status = 'printing' WHERE id = 123. The previous state is destroyed. You cannot answer questions like:

Event sourcing records every state change as an immutable, append-only event. The event stream is the source of truth. Current state is a derived projection — a fold over the event stream.

# Instead of: UPDATE jobs SET status = 'printing'
# You append:
JobMovedToStation(
    job_id="job-123",
    station="heidelberg-xl-106",
    operator="maria.garcia",
    timestamp="2026-03-01T14:23:00Z",
    previous_station="prepress-2"
)

5.2 Event Anatomy

@dataclass(frozen=True)
class DomainEvent:
    """Base structure for all domain events."""
    event_id: str                        # UUID — globally unique
    stream_id: str                       # e.g., "job:abc-123" or "product:def-456"
    version: int                         # Monotonic within stream (optimistic concurrency)
    event_type: str                      # e.g., "JobCreated", "NodeAdded", "QuoteGenerated"
    timestamp: str                       # ISO 8601
    payload: dict                        # Event-specific data
    metadata: dict                       # Actor, correlation_id, causation_id

# Concrete event examples:

@dataclass(frozen=True)
class JobCreated:
    job_id: str
    customer_id: str
    product_config_id: str
    source_tool: str                     # Which tool initiated this
    source_files: tuple[str, ...]

@dataclass(frozen=True)
class NormalizationCompleted:
    job_id: str
    tool: str                            # "trading_card_sheet_normalizer"
    row_count: int
    warning_count: int
    error_count: int
    profiles_detected: tuple[str, ...]

@dataclass(frozen=True)
class QuoteGenerated:
    job_id: str
    product_config_id: str
    total_price: str                     # Decimal as string for precision
    line_items: tuple[dict, ...]
    rate_table_versions: dict            # Which rate tables were used

@dataclass(frozen=True)
class ProductionStageCompleted:
    job_id: str
    stage: str                           # "imposition", "printing", "cutting"
    operator: str
    station: str
    waste_sheets: int
    duration_seconds: int

5.3 Event Streams and Replay

Events are grouped into streams by aggregate root (typically a job or a product configuration). Each stream is an ordered sequence:

# Event stream for job "job-abc-123":
# v1: JobCreated          {customer: "Acme Cards", product_config: "cfg-789"}
# v2: NormalizationCompleted {rows: 45, warnings: 2}
# v3: ArtMatchCompleted   {matched: 43, unmatched: 2}
# v4: ImpositionBuilt     {sheets: 3, layout: "18-up"}
# v5: QuoteGenerated      {total: "1,234.50"}
# v6: QuoteApproved       {approved_by: "john.smith"}
# v7: ProductionStarted   {station: "prepress-1"}
# ...

# Current state = fold(events, initial_state, apply_fn)
def rebuild_job_state(events: list[DomainEvent]) -> JobState:
    """Replay events to reconstruct current state."""
    state = JobState.empty()
    for event in events:
        state = apply_event(state, event)
    return state

def apply_event(state: JobState, event: DomainEvent) -> JobState:
    """Pure function: apply one event to produce new state."""
    match event.event_type:
        case "JobCreated":
            return state.with_status("created").with_customer(event.payload["customer_id"])
        case "QuoteApproved":
            return state.with_status("approved")
        case "ProductionStarted":
            return state.with_status("in_production").with_station(event.payload["station"])
        case _:
            return state  # Unknown events are safely ignored
Important: apply_event is a PURE function It takes state + event, returns new state. No I/O. No database. No side effects. This means it lives in core/event_store/application/decision_logic.py and is tested with plain unit tests. The Soft Code discipline applies perfectly.

5.4 Projections (Materialized Read Models)

While event streams are the source of truth, querying them directly is expensive for operational views. Projections are pre-computed read models maintained by asynchronously processing new events:

ProjectionSource EventsPurpose
Job BoardJobCreated, StatusChanged, StageCompletedDashboard showing all jobs with current status
Station QueueProductionStarted, StageCompletedWhat’s waiting at each machine/station
Customer HistoryJobCreated, QuoteGenerated, QuoteApprovedAll jobs for a customer, quotes, approvals
Pricing AuditQuoteGeneratedWhich rate tables were used, when, for what
Waste ReportStageCompletedWaste sheets per station, operator, time period

Projections are disposable and rebuildable. If a projection is corrupted or you change its schema, delete it and replay all events. The event stream is never lost.

6. CQRS Pattern

6.1 Command Side (Writes)

Commands represent intentions: “create this job,” “approve this quote,” “move to next station.” The command handler validates, executes business logic (pure functions from core/), and emits events:

# Command flow:
# 1. Receive command (adapter layer — Flask route or API)
# 2. Load current state (replay events for this stream)
# 3. Validate command against current state (core decision function)
# 4. Execute business logic (core decision function)
# 5. Emit new events (adapter persists to event store)

def handle_approve_quote(cmd: ApproveQuoteCommand, events: list[DomainEvent]) -> list[DomainEvent]:
    """Pure function: given current events + command, return new events."""
    state = rebuild_job_state(events)
    errors = validate_approval(state, cmd)       # Can this quote be approved?
    if errors:
        return [CommandRejected(errors=errors)]
    return [QuoteApproved(
        job_id=cmd.job_id,
        approved_by=cmd.approver,
        approved_price=state.quoted_price,
    )]

6.2 Query Side (Reads)

Queries never touch the event store. They read from denormalized projection tables optimized for specific views:

# Query side — reads from projections, never from event store
# Each projection table is optimized for its use case:

# Job board: SELECT * FROM job_board WHERE status = 'in_production' ORDER BY priority
# Station:   SELECT * FROM station_queue WHERE station = 'heidelberg-xl-106'
# Customer:  SELECT * FROM customer_history WHERE customer_id = 'acme-cards'
# Agent:     SELECT * FROM job_board WHERE next_action IS NOT NULL

6.3 Eventual Consistency

Projections are updated asynchronously as events arrive. There is a small delay (typically 1–5 seconds) between an event being written and the projection reflecting it.

Is this acceptable? Yes. An operator seeing a job appear on their station queue 2 seconds after it was assigned is perfectly fine. A dashboard refreshing every 5 seconds is perfectly fine. Financial reports that run nightly are perfectly fine. The only thing that must be immediately consistent is the event stream itself — and it is, by definition (append-only with version constraints).

7. Pricing Engine

7.1 The Three-Phase Pattern: Pre-load, Evaluate, Aggregate

Legacy MIS systems price composable products by querying the database for each variable: substrate cost, ink cost, finishing cost, shipping cost — one query per variable per component. A complex card with 20 configuration nodes triggers 50–200+ database round trips.

The DAG pricing engine inverts this:

PhaseI/O?OperationsQueries
1. Pre-loadYes (adapter)Load product config + all relevant rate tables1–3 total
2. EvaluateNo (core)Pure arithmetic on cached data, traverse DAG bottom-up0
3. AggregateNo (core)Parent nodes roll up children results0

This transforms quoting from I/O-bound (seconds) to CPU-bound (milliseconds).

7.2 Bottom-Up DAG Traversal with Memoization

def evaluate_pricing(config: ProductConfig, rate_context: RateContext) -> PricingResult:
    """Bottom-up (post-order) DAG traversal with memoization."""
    memo: dict[str, NodePrice] = {}
    order = topological_sort(config)     # Children before parents

    for node_id in order:
        node = config.nodes[node_id]
        if node_id in memo:
            continue                     # Already evaluated (DAG shared node)

        # Evaluate children first (guaranteed by topological order)
        child_prices = [memo[cid] for cid in node.children]

        # Look up pricing function from registry
        pricing_fn = get_pricing_function(node.pricing_function_id)

        # Pure computation — no database access
        node_price = pricing_fn(
            node=node,
            children=child_prices,
            rates=rate_context,
        )

        memo[node_id] = node_price

    return PricingResult(
        total=memo[config.root_node_id].total,
        breakdown=memo,
    )
Memoization for Diamond Dependencies In the trading card DAG, the Lamination node has two parents (Front layer and Back layer). Without memoization, it would be evaluated twice. The memo cache ensures each node is computed exactly once, regardless of how many paths lead to it.

7.3 Three-Layer Caching Strategy

Cache LayerScopeTTL / InvalidationWhat It Caches
L1: Rate TablesApplication-wide1-hour TTL + explicit invalidation on updateSubstrate prices, ink costs, finishing rates
L2: Subtree PricingPer-productHash of config document — invalidated on any mutationPreviously computed subtree results
L3: Node MemoizationPer-requestIn-memory, discarded after requestPrevents redundant DAG node evaluation

8. Storage: Supabase Postgres + JSONB

8.1 Why This Stack

The article recommends a hybrid storage strategy: JSONB documents for hierarchical product configurations, relational tables for stable reference data. Supabase gives you exactly this as a managed service:

8.2 Hybrid Storage Strategy

Data TypeStorageWhy
Product configurations (DAG nodes)Relational rows + JSONB attributesOne row per node, JSONB for flexible properties
Event streamsAppend-only table + JSONB payloadImmutable events, indexed by stream + version
Rate tablesRelational columnsStable schema, frequently queried, indexed
Customers, operatorsRelational columnsStandard entity tables, RLS policies
ProjectionsDenormalized tables + JSONB summaryDisposable read models, trigger-maintained
The Key Insight Frequently-queried fields get dedicated columns (indexed, typed, RLS-compatible). Variable, type-specific fields go in JSONB. This is the best-practice hybrid approach recommended by PostgreSQL experts: use JSONB as a “catch-all” for variable parts, columns for stable parts.

8.3 Complete Schema Design

-- ═══════════════════════════════════════════════════════════════
-- PRODUCT CONFIGURATION (DAG)
-- ═══════════════════════════════════════════════════════════════

-- One row per node, NOT one giant document per product.
-- Each node's JSONB attributes stays small (well under TOAST threshold).
CREATE TABLE config_nodes (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    product_id      UUID NOT NULL,                -- Which product this node belongs to
    type            TEXT NOT NULL,                 -- 'layer', 'process', 'assembly', 'finishing'
    version         INT NOT NULL DEFAULT 1,        -- Optimistic concurrency
    attributes      JSONB NOT NULL DEFAULT '{}',   -- Type-specific properties (open schema)
    children        UUID[] DEFAULT '{}',           -- Child node IDs (directed edges)
    parents         UUID[] DEFAULT '{}',           -- Parent node IDs (directed edges)
    pricing_fn      TEXT,                          -- Pricing function identifier
    created_at      TIMESTAMPTZ DEFAULT now(),
    updated_at      TIMESTAMPTZ DEFAULT now()
);

-- Product-level aggregation
CREATE TABLE products (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id     UUID NOT NULL REFERENCES customers(id),
    name            TEXT NOT NULL,
    root_node_id    UUID REFERENCES config_nodes(id),
    version         INT NOT NULL DEFAULT 1,
    status          TEXT NOT NULL DEFAULT 'draft',  -- 'draft', 'quoted', 'approved', 'in_production'
    metadata        JSONB DEFAULT '{}',             -- Job name, PO number, etc.
    created_at      TIMESTAMPTZ DEFAULT now(),
    updated_at      TIMESTAMPTZ DEFAULT now()
);

-- Indexes
CREATE INDEX idx_nodes_product ON config_nodes(product_id);
CREATE INDEX idx_nodes_type ON config_nodes(type);
CREATE INDEX idx_nodes_attrs ON config_nodes USING GIN(attributes);
CREATE INDEX idx_products_customer ON products(customer_id);
CREATE INDEX idx_products_status ON products(status);


-- ═══════════════════════════════════════════════════════════════
-- NODE TYPE REGISTRY
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE node_types (
    type_name       TEXT PRIMARY KEY,
    display_name    TEXT NOT NULL,
    attribute_schema JSONB NOT NULL,               -- JSON Schema for attributes validation
    allowed_children TEXT[] DEFAULT '{}',           -- Allowed child type_names
    default_pricing TEXT,                           -- Default pricing function ID
    created_at      TIMESTAMPTZ DEFAULT now()
);


-- ═══════════════════════════════════════════════════════════════
-- IMMUTABLE EVENT LOG
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE events (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id       TEXT NOT NULL,                 -- e.g., 'job:abc-123', 'product:def-456'
    version         INT NOT NULL,                  -- Monotonic within stream
    event_type      TEXT NOT NULL,                 -- e.g., 'JobCreated', 'QuoteGenerated'
    payload         JSONB NOT NULL,                -- Event-specific data
    metadata        JSONB DEFAULT '{}',            -- Actor, correlation_id, causation_id
    created_at      TIMESTAMPTZ DEFAULT now(),
    UNIQUE(stream_id, version)                     -- Optimistic concurrency control
);

CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_created ON events(created_at);

-- Prevent mutation: events are append-only
CREATE OR REPLACE FUNCTION prevent_event_mutation()
RETURNS TRIGGER AS $$
BEGIN
    RAISE EXCEPTION 'Events are immutable. Cannot update or delete.';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER events_immutable
    BEFORE UPDATE OR DELETE ON events
    FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();


-- ═══════════════════════════════════════════════════════════════
-- RATE TABLES (Relational — Stable Schema)
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE rate_tables (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name            TEXT NOT NULL,                 -- 'substrate_costs', 'ink_rates', 'finishing_rates'
    category        TEXT NOT NULL,                 -- 'material', 'labor', 'equipment', 'finishing'
    rates           JSONB NOT NULL,                -- Structured pricing data
    quantity_breaks JSONB,                         -- Quantity-based pricing tiers
    valid_from      TIMESTAMPTZ NOT NULL,
    valid_until     TIMESTAMPTZ,                   -- NULL = currently active
    version         INT NOT NULL DEFAULT 1,
    created_at      TIMESTAMPTZ DEFAULT now(),
    updated_at      TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_rates_category ON rate_tables(category);
CREATE INDEX idx_rates_valid ON rate_tables(valid_from, valid_until);


-- ═══════════════════════════════════════════════════════════════
-- PROJECTIONS (Denormalized Read Models — Disposable & Rebuildable)
-- ═══════════════════════════════════════════════════════════════

-- Job board projection
CREATE TABLE proj_job_board (
    job_id          TEXT PRIMARY KEY,
    customer_name   TEXT,
    product_name    TEXT,
    status          TEXT NOT NULL,
    current_station TEXT,
    assigned_to     TEXT,
    quoted_price    NUMERIC(12,2),
    priority        INT DEFAULT 0,
    summary         JSONB DEFAULT '{}',            -- Aggregated stats
    last_event_at   TIMESTAMPTZ,
    updated_at      TIMESTAMPTZ DEFAULT now()
);

-- Station queue projection
CREATE TABLE proj_station_queue (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    station         TEXT NOT NULL,
    job_id          TEXT NOT NULL,
    stage           TEXT NOT NULL,
    priority        INT DEFAULT 0,
    queued_at       TIMESTAMPTZ DEFAULT now(),
    estimated_duration_min INT
);
CREATE INDEX idx_station_queue ON proj_station_queue(station, priority);

-- Customer history projection
CREATE TABLE proj_customer_history (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id     UUID NOT NULL,
    job_id          TEXT NOT NULL,
    event_type      TEXT NOT NULL,
    summary         JSONB,
    occurred_at     TIMESTAMPTZ
);
CREATE INDEX idx_customer_history ON proj_customer_history(customer_id, occurred_at);


-- ═══════════════════════════════════════════════════════════════
-- REFERENCE TABLES
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE customers (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name            TEXT NOT NULL,
    email           TEXT,
    metadata        JSONB DEFAULT '{}',
    created_at      TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE operators (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name            TEXT NOT NULL,
    station         TEXT,
    role            TEXT DEFAULT 'operator',
    active          BOOLEAN DEFAULT true,
    created_at      TIMESTAMPTZ DEFAULT now()
);


-- ═══════════════════════════════════════════════════════════════
-- ROW LEVEL SECURITY (Multi-Tenant Foundation)
-- ═══════════════════════════════════════════════════════════════

ALTER TABLE products ENABLE ROW LEVEL SECURITY;
ALTER TABLE config_nodes ENABLE ROW LEVEL SECURITY;
ALTER TABLE events ENABLE ROW LEVEL SECURITY;

-- Example policy: users see only their organization's products
-- (Uncomment and customize when multi-tenant is needed)
-- CREATE POLICY products_tenant_isolation ON products
--     USING (metadata->>'tenant_id' = current_setting('app.current_tenant'));

8.4 TOAST and JSONB Performance

PostgreSQL applies TOAST (The Oversized-Attribute Storage Technique) compression to values exceeding ~2KB. TOAST-compressed values are stored in a separate table, requiring additional disk seeks on read.

The TOAST Trap If you store an entire product configuration as one giant JSONB document (all 20+ nodes in a single column), you will exceed 2KB on any non-trivial product. Every read requires decompression from TOAST storage. Performance degrades.

The solution: one row per node. The schema above stores each ConfigNode as a separate row. The attributes JSONB per node is small (100–500 bytes typically). The flat-map is reconstructed in Python:

# Adapter layer: load product config from Supabase
def load_product_config(product_id: str) -> ProductConfig:
    """Load all nodes for a product and assemble into ProductConfig."""
    product = supabase.table("products").select("*").eq("id", product_id).single().execute()
    nodes = supabase.table("config_nodes").select("*").eq("product_id", product_id).execute()

    # Build flat-map in Python (core pure function)
    node_map = {row["id"]: to_config_node(row) for row in nodes.data}

    return ProductConfig(
        id=product["id"],
        root_node_id=product["root_node_id"],
        nodes=node_map,
        version=product["version"],
        created_at=product["created_at"],
        metadata=product["metadata"],
    )
    # Total queries: 2 (product + nodes). Regardless of graph complexity.

8.5 Row Level Security for Multi-Tenant

When you need customer isolation (customer A cannot see customer B’s products), Supabase’s Row Level Security adds a WHERE clause to every query automatically:

-- Every query on products automatically gets:
-- WHERE metadata->>'tenant_id' = auth.jwt()->>'tenant_id'
-- No code changes needed — the database enforces isolation.

This is critical for an enterprise MIS. You add multi-tenant isolation at the database level, not in application code. Every query, every projection, every event stream is automatically scoped.

8.6 Realtime and Edge Functions: Caveats

Supabase Realtime

Supabase Edge Functions

The Right Pattern for Projections

-- Use PostgreSQL triggers to maintain projections server-side.
-- This runs in the database, not in Edge Functions or Realtime.

CREATE OR REPLACE FUNCTION update_job_board_projection()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO proj_job_board (job_id, status, last_event_at, updated_at)
    VALUES (
        NEW.stream_id,
        NEW.payload->>'status',
        NEW.created_at,
        now()
    )
    ON CONFLICT (job_id)
    DO UPDATE SET
        status = COALESCE(NEW.payload->>'status', proj_job_board.status),
        last_event_at = NEW.created_at,
        updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_update_job_board
    AFTER INSERT ON events
    FOR EACH ROW
    WHEN (NEW.event_type IN ('JobCreated', 'StatusChanged', 'StageCompleted'))
    EXECUTE FUNCTION update_job_board_projection();

8.7 Migration Path

Supabase is standard PostgreSQL. If you outgrow the managed service:

  1. pg_dump your entire database
  2. pg_restore to any Postgres instance (AWS RDS, DigitalOcean, self-hosted)
  3. Update connection strings in your adapter layer
  4. Core never changes — it doesn’t know Supabase exists

9. Integration Architecture: DAG ES + Tool Hub

9.1 Layer Mapping

How DAG Event Sourcing Maps to Soft Code Layers ┌────────────────────────────────────────────────────────────────────────────┐ │ core/ (pure functions, zero I/O, frozen dataclasses) │ │ │ │ product_graph/ ConfigNode, DAG ops, traversal, validation │ │ event_store/ DomainEvent, append, replay, apply_event │ │ pricing_engine/ Bottom-up evaluate, rate context, memoization │ │ job_workflow/ Workflow DAG definition, stage sequencing │ │ existing tools normalizer, imposer, splitter, matcher, quoter │ │ │ │ All functions: data in → data out. No imports of Flask, supabase, os. │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ contracts.run(ToolInput(...)) ┌──────────────────────────────────▼─────────────────────────────────────────┐ │ adapters/ │ │ │ │ flask/ UI routes (existing tools, unchanged) │ │ supabase/ NEW persistence adapter │ │ ├── graph_persistence.py Load/save config_nodes, products │ │ ├── event_persistence.py Append events, load streams │ │ ├── rate_loader.py Batch-load rate tables into RateContext │ │ ├── projection_reader.py Read from projection tables │ │ └── connection.py Supabase client setup │ │ │ │ Adapters call core via contracts. Supabase adapter handles all DB I/O. │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ supabase-py client ┌──────────────────────────────────▼─────────────────────────────────────────┐ │ Supabase Postgres │ │ │ │ config_nodes One row per node, JSONB attributes │ │ products Product-level metadata, root_node_id │ │ events Immutable append-only, JSONB payload │ │ rate_tables Relational, indexed, versioned │ │ node_types Type registry with JSON Schema │ │ proj_* Trigger-maintained read models │ │ customers RLS-isolated reference data │ └────────────────────────────────────────────────────────────────────────────┘

9.2 Tool Composition via Workflow DAG

Today, your tools are independent. With a workflow DAG, they compose into production pipelines:

Example: Trading Card Production Workflow DAG ┌──────────────────────┐ │ Normalize RFQ │ trading_card_sheet_normalizer │ Input: XLSX/CSV │ │ Output: canonical rows│ └──────────┬───────────┘ │ ┌───────┴───────┐ ▼ ▼ ┌─────────────┐ ┌──────────────┐ │ Match Art │ │ Quote Order │ card_art_rfq_matcher / pricing_engine │ to Subjects │ │ │ └──────┬──────┘ └──────┬───────┘ │ │ ▼ ▼ ┌─────────────┐ ┌──────────────┐ │ Build │ │ Await │ │ Imposition │ │ Approval │ (human decision gate) └──────┬──────┘ └──────┬───────┘ │ │ ▼ ▼ ┌─────────────┐ ┌──────────────┐ │ Split PDF │ │ Generate │ pdf_splitter / label_generator │ │ │ Labels │ └──────┬──────┘ └──────┬───────┘ │ │ └────────┬───────┘ ▼ ┌───────────┐ │ Production│ │ Complete │ └───────────┘

Each node in this workflow DAG is a tool invocation. Edges define data flow. The core/job_workflow/ tool validates the DAG, determines execution order, and tracks completion. This is the foundation for agentic behavior — an AI agent reads this graph, checks event state, and decides what to run next.

9.3 Contract Evolution

Existing tool contracts (ToolInput / ToolOutput) gain optional event metadata:

# Enhanced ToolOutput — backward compatible
@dataclass(frozen=True)
class ToolOutput:
    success: bool
    result: dict
    errors: list[str]
    warnings: list[str]
    contracts_version: str

    # NEW — optional, backward-compatible additions:
    events: tuple[dict, ...] = ()       # Domain events emitted by this run
    job_id: str | None = None            # If this run is part of a tracked job
    correlation_id: str | None = None    # For tracing across tool invocations

Existing tools continue working with events=(). New tools or enhanced versions emit events. The adapter layer decides whether to persist them.

9.4 Adapter Extensions

The new adapters/supabase/ directory follows Soft Code conventions:

adapters/supabase/
├── connection.py               # Supabase client initialization
├── graph_persistence.py        # Load/save ProductConfig ↔ config_nodes table
├── event_persistence.py        # Append events, load streams, optimistic concurrency
├── rate_loader.py              # Batch-load rate tables into RateContext
├── projection_reader.py        # Read from proj_* tables
├── RUNTIME_DEPENDENCIES.md     # supabase-py, python-dotenv
└── tests/
    └── test_persistence.py     # Integration tests (requires Supabase connection)

Flask adapters remain unchanged. They gain an optional integration point: after calling run_contract(), they can pass events to the Supabase adapter for persistence. But this is additive — existing tools work without it.

10. Phased Roadmap

Following the Strangler Fig migration pattern from the article: build incrementally, validate at each stage, never attempt big-bang migration.

Phase 0: Foundation (Build First)

Goal: Create the structural primitives. No database yet. Pure functions only.

New Core ToolPurposeDeliverables
core/product_graph/ ConfigNode DAG — add, remove, connect, validate, traverse DECISIONS.md, models.py, graph_ops.py, traversal.py, tests
core/event_store/ Immutable event log — append, replay, apply, project DECISIONS.md, models.py, event_log.py, tests
core/pricing_engine/ Bottom-up DAG evaluation with pre-loaded rate context DECISIONS.md, models.py, evaluator.py, tests

Key constraint: Each tool follows the existing 6-step workflow. All pass preflight.py. Zero I/O in core. No database dependency.

Phase 1: Connect (Persist & Compose)

Goal: Connect foundation to Supabase. Existing tools start emitting events.

Phase 2: Track (Event Sourcing for Production)

Goal: Full production lifecycle tracking with audit trails.

Phase 3: Agentic Orchestration

Goal: AI agent that navigates workflow DAGs and decides next actions.

11. Trade-offs and Risks

Advantages

BenefitImpact
Flexible product schemasNew product types without code deployment or schema migration
Millisecond quoting1–3 queries total, CPU-bound arithmetic instead of I/O-bound lookups
Complete audit trailsEvery decision, every state change, every actor recorded permanently
Tool composabilityWorkflow DAGs define how tools chain together
Replay and debuggingReproduce any historical state by replaying events to that point
Multi-tenant readyRLS at database level, not application code
Agentic foundationEvent state + workflow DAGs are perfectly structured for AI agent consumption

Costs and Risks

CostMitigation
Event schema evolution — once emitted, changing event shapes requires migrationVersion events from day one. Use upcasting (transform old events to new schema on read). Keep payloads backward-compatible.
Operational complexity — event consumers, projections, eventual consistencyStart with Postgres triggers (simple, in-database). Add external consumers only when needed.
Debugging surface area — tracing event streams instead of call stacksCorrelation IDs on every event. Build an event explorer tool early.
Eventual consistency — projections may lag 1–5 seconds behind eventsAcceptable for operational views. Only the event stream must be immediately consistent.
Database dependency — moving from stateless to statefulSupabase is managed. Core remains pure and testable without database. Only adapters touch DB.
Specialized talent — event sourcing requires different mental modelThis document. Start simple. Build incrementally. Phase 0 requires no event sourcing knowledge.
Ad-hoc queries harder — event stores are less ergonomic than flat tables for analyticsProjections are denormalized tables — query them like normal SQL. Event store is for writes + replay.
The article’s honest assessment “The architecture excels for composable product quoting and tracking but is overkill for flat, uniform products with low quote volume.” Your trading card products are composable (layers, finishes, variable configs). Your quote volume is growing. This architecture is appropriate for your domain.

12. What Changes, What Stays

What Does NOT Change

What DOES Change

13. Reference: Complete Data Models

Product Graph Domain

# core/product_graph/domain/models.py

from dataclasses import dataclass

@dataclass(frozen=True)
class ConfigNode:
    id: str
    type: str
    version: int
    attributes: dict
    children: tuple[str, ...]
    parents: tuple[str, ...]
    pricing_function_id: str | None = None
    tracking: "TrackingState | None" = None

@dataclass(frozen=True)
class TrackingState:
    status: str
    assigned_to: str | None = None
    station: str | None = None
    timestamps: dict = None

@dataclass(frozen=True)
class ProductConfig:
    id: str
    root_node_id: str
    nodes: dict  # {str: ConfigNode} — use dict for frozen compatibility
    version: int
    created_at: str
    metadata: dict

@dataclass(frozen=True)
class NodeTypeDefinition:
    type_name: str
    display_name: str
    attribute_schema: dict
    allowed_child_types: tuple[str, ...]
    default_pricing_function: str | None = None

Event Store Domain

# core/event_store/domain/models.py

from dataclasses import dataclass

@dataclass(frozen=True)
class DomainEvent:
    event_id: str
    stream_id: str
    version: int
    event_type: str
    timestamp: str
    payload: dict
    metadata: dict

@dataclass(frozen=True)
class EventStream:
    stream_id: str
    events: tuple  # tuple[DomainEvent, ...]
    current_version: int

@dataclass(frozen=True)
class Snapshot:
    stream_id: str
    version: int
    state: dict
    created_at: str

@dataclass(frozen=True)
class AppendResult:
    success: bool
    errors: tuple[str, ...]
    new_version: int

Pricing Engine Domain

# core/pricing_engine/domain/models.py

from dataclasses import dataclass

@dataclass(frozen=True)
class RateContext:
    """Pre-loaded rate data for a pricing run. No DB access during evaluation."""
    substrate_rates: dict
    ink_rates: dict
    finishing_rates: dict
    labor_rates: dict
    quantity_breaks: dict
    markup_rules: dict
    loaded_at: str
    rate_table_versions: dict   # {table_name: version} for audit

@dataclass(frozen=True)
class NodePrice:
    node_id: str
    node_type: str
    base_cost: str              # Decimal as string
    quantity_adjusted: str
    markup: str
    total: str
    breakdown: dict             # Line-item details
    children_total: str         # Rolled-up children cost

@dataclass(frozen=True)
class PricingResult:
    success: bool
    errors: tuple[str, ...]
    total: str
    breakdown: dict             # {node_id: NodePrice}
    rate_context_versions: dict # Which rate tables were used

14. Reference: Complete SQL Schema

The full schema from Section 8.3 is deployment-ready. Key design decisions:

DecisionRationale
One row per ConfigNode, not one document per productAvoids TOAST compression, enables per-node indexing and RLS
UNIQUE(stream_id, version) on eventsOptimistic concurrency — concurrent writers get a unique violation, must retry
Trigger-based immutability on eventsDatabase enforces append-only semantics, not application code
JSONB attributes on nodes, columns for typeHybrid: index/query the stable fields, JSONB for the variable parts
Projection tables are separate from eventsDisposable and rebuildable; optimized for read patterns
UUID[] for children/parentsPostgreSQL native arrays; enables ANY() queries and GIN indexing
Rate tables with valid_from/valid_untilTemporal validity — reproduce historical quotes with the rates active at that time

15. Glossary

TermDefinition
DAGDirected Acyclic Graph. Nodes connected by directed edges with no cycles. Products are modeled as DAGs where nodes are components and edges are dependencies.
ConfigNodeA single node in the product configuration graph. Represents a layer, process, assembly, or finishing step.
ProductConfigA complete product configuration: a flat map of ConfigNodes with a designated root node.
Event SourcingArchitecture pattern where state changes are recorded as immutable events. Current state is derived by replaying the event stream.
DomainEventAn immutable record of something that happened. Contains a type, payload, timestamp, and metadata.
Event StreamAn ordered sequence of events for a single aggregate (e.g., all events for job “abc-123”).
ProjectionA denormalized read model derived from events. Optimized for specific query patterns. Disposable and rebuildable.
CQRSCommand Query Responsibility Segregation. Separate write model (commands → events) from read model (projections).
Topological SortOrdering of DAG nodes such that every parent appears after its children. Used for bottom-up pricing evaluation.
MemoizationCaching node evaluation results to prevent duplicate computation of shared DAG nodes (diamond dependencies).
TOASTPostgreSQL’s Oversized-Attribute Storage Technique. Compresses values >2KB into a separate table. Avoid by keeping JSONB values small.
RLSRow Level Security. PostgreSQL feature that automatically filters rows based on the current user’s identity. Foundation for multi-tenant isolation.
Strangler FigMigration pattern where new system runs parallel to legacy, gradually taking over functionality. Never big-bang.
Soft CodeArchitecture discipline in tool-hub-try5: decisions separated from plumbing, enforced by file structure and automated checks.
Optimistic ConcurrencyConflict detection via version numbers. If two writers try to append version 5 to the same stream, one gets a unique constraint violation and must retry.
Rate ContextPre-loaded bundle of all rate tables needed for a pricing run. Eliminates database access during evaluation.

DAG Event Sourcing × Tool Hub Deep Dive · March 2026
Architecture study for tool-hub-try5 → Enterprise ERP MIS
Storage: Supabase Postgres + JSONB